On Wed, Jul 3, 2024, at 13:17, Dean Rasheed wrote:
> On Tue, 2 Jul 2024 at 21:10, Joel Jacobson <j...@compiler.org> wrote:
>>
>> I found the bug in the case 3 code,
>> and it turns out the same type of bug also exists in the case 2 code:
>>
>> case 2:
>> newdig = (int) var1digits[1] *
>> var2digits[res_ndigits - 4];
>>
>> The problem here is that res_ndigits could become less than 4,
>
> Yes. It can't be less than 3 though (per an earlier test), so the case
> 2 code was correct.
Hmm, I don't see how the case 2 code can be correct?
If, like you say, res_ndigits can't be less than 3, that means it can be 3,
right?
And if res_ndigits=3 then `var2digits[res_ndigits - 4]` would try to access
`var2digits[-1]`.
> I've been hacking on this a bit and trying to tidy it up. Firstly, I
> moved it to a separate function, because it was starting to look messy
> having so much extra code in mul_var(). Then I added a bunch more
> comments to explain what's going on, and the limits of the various
> variables. Note that most of the boundary checks are actually
> unnecessary -- in particular all the ones in or after the main loop,
> provided you pull out the first 2 result digits from the main loop in
> the 3-digit case. That does seem to work very well, but...
Nice, I was starting to feel a bit uncomfortable with the level of increased
complexity.
> I wasn't entirely happy with how messy that code is getting, so I
> tried a different approach. Similar to div_var_int(), I tried writing
> a mul_var_int() function instead. This can be used for 1 and 2 digit
> factors, and we could add a similar mul_var_int64() function on
> platforms with 128-bit integers. The code looks quite a lot neater, so
> it's probably less likely to contain bugs (though I have just written
> it in a hurry,so it might still have bugs). In testing, it seemed to
> give a decent speedup, but perhaps a little less than before. But
> that's to be balanced against having more maintainable code, and also
> a function that might be useful elsewhere in numeric.c.
>
> Anyway, here are both patches for comparison. I'll stop hacking for a
> while and let you see what you make of these.
I've tested both patches, and they produces the same output given the
same input as HEAD, when rscale is unmodified (full precision).
However, for a reduced rscale, there are some differences:
mul_var_small() seems more resilient to rscale reductions than mul_var_int().
The previous version we worked on, I've called "mul_var inlined" in the output
below.
```
CREATE TABLE test_numeric_mul_patched (
var1 numeric,
var2 numeric,
rscale_adjustment int,
result numeric
);
DO $$
DECLARE
var1 numeric;
var2 numeric;
BEGIN
FOR i IN 1..1000 LOOP
RAISE NOTICE '%', i;
FOR var1ndigits IN 1..4 LOOP
FOR var2ndigits IN 1..4 LOOP
FOR var1dscale IN 0..(var1ndigits*4) LOOP
FOR var2dscale IN 0..(var2ndigits*4) LOOP
FOR rscale_adjustment IN 0..(var1dscale+var2dscale) LOOP
var1 := round(random(
format('1%s',repeat('0',(var1ndigits-1)*4-1))::numeric,
format('%s',repeat('9',var1ndigits*4))::numeric
) / 10::numeric^var1dscale, var1dscale);
var2 := round(random(
format('1%s',repeat('0',(var2ndigits-1)*4-1))::numeric,
format('%s',repeat('9',var2ndigits*4))::numeric
) / 10::numeric^var2dscale, var2dscale);
INSERT INTO test_numeric_mul_patched
(var1, var2, rscale_adjustment)
VALUES
(var1, var2, -rscale_adjustment);
END LOOP;
END LOOP;
END LOOP;
END LOOP;
END LOOP;
END LOOP;
END $$;
UPDATE test_numeric_mul_patched SET result = numeric_mul_head(var1, var2,
rscale_adjustment);
SELECT
rscale_adjustment,
COUNT(*),
COUNT(*) FILTER (WHERE result IS DISTINCT FROM
numeric_mul_patch_int(var1,var2,rscale_adjustment)) AS "mul_var_int",
COUNT(*) FILTER (WHERE result IS DISTINCT FROM
numeric_mul_patch_small(var1,var2,rscale_adjustment)) AS "mul_var_small",
COUNT(*) FILTER (WHERE result IS DISTINCT FROM
numeric_mul_patch_inline(var1,var2,rscale_adjustment)) AS "mul_var inlined"
FROM test_numeric_mul_patched
GROUP BY 1
ORDER BY 1;
rscale_adjustment | count | mul_var_int | mul_var_small | mul_var inlined
-------------------+---------+-------------+---------------+-----------------
-32 | 1000 | 0 | 0 | 0
-31 | 3000 | 0 | 0 | 0
-30 | 6000 | 0 | 0 | 0
-29 | 10000 | 0 | 0 | 0
-28 | 17000 | 0 | 0 | 0
-27 | 27000 | 0 | 0 | 0
-26 | 40000 | 0 | 1 | 0
-25 | 56000 | 1 | 11 | 0
-24 | 78000 | 316 | 119 | 1
-23 | 106000 | 498 | 1696 | 0
-22 | 140000 | 531 | 2480 | 1
-21 | 180000 | 591 | 3145 | 0
-20 | 230000 | 1956 | 5309 | 1
-19 | 290000 | 2189 | 5032 | 0
-18 | 360000 | 2314 | 4868 | 0
-17 | 440000 | 2503 | 4544 | 1
-16 | 533000 | 5201 | 3633 | 0
-15 | 631000 | 5621 | 3006 | 0
-14 | 734000 | 5907 | 2631 | 0
-13 | 842000 | 6268 | 2204 | 0
-12 | 957000 | 9558 | 778 | 0
-11 | 1071000 | 10597 | 489 | 0
-10 | 1184000 | 10765 | 193 | 0
-9 | 1296000 | 9452 | 0 | 0
-8 | 1408000 | 1142 | 0 | 0
-7 | 1512000 | 391 | 0 | 0
-6 | 1608000 | 235 | 0 | 0
-5 | 1696000 | 0 | 0 | 0
-4 | 1776000 | 0 | 0 | 0
-3 | 1840000 | 0 | 0 | 0
-2 | 1888000 | 0 | 0 | 0
-1 | 1920000 | 0 | 0 | 0
0 | 1936000 | 0 | 0 | 0
(33 rows)
SELECT
result - numeric_mul_patch_int(var1,var2,rscale_adjustment),
COUNT(*)
FROM test_numeric_mul_patched
GROUP BY 1
ORDER BY 1;
?column? | count
----------------+----------
0 | 24739964
0.000000000001 | 2170
0.00000000001 | 234
0.0000000001 | 18
0.000000001 | 4
0.00000001 | 8927
0.0000001 | 882
0.000001 | 90
0.00001 | 6
0.0001 | 21963
0.001 | 2174
0.01 | 214
0.1 | 18
1 | 39336
(14 rows)
SELECT
result - numeric_mul_patch_small(var1,var2,rscale_adjustment),
COUNT(*)
FROM test_numeric_mul_patched
GROUP BY 1
ORDER BY 1;
?column? | count
-------------------+----------
-1 | 1233
-0.01 | 9
-0.001 | 73
-0.0001 | 647
-0.000001 | 2
-0.0000001 | 9
-0.00000001 | 116
0.000000000000000 | 24775861
0.00000001 | 1035
0.00000002 | 2
0.0000001 | 96
0.000001 | 9
0.0001 | 8771
0.0002 | 3
0.001 | 952
0.01 | 69
0.1 | 10
1 | 27098
2 | 5
(19 rows)
SELECT
result - numeric_mul_patch_inline(var1,var2,rscale_adjustment),
COUNT(*)
FROM test_numeric_mul_patched
GROUP BY 1
ORDER BY 1;
?column? | count
----------+----------
-1 | 4
0 | 24815996
(2 rows)
```
I found these two interesting to look closer at:
```
0.00000002 | 2
0.0002 | 3
SELECT
*,
numeric_mul_patch_small(var1,var2,rscale_adjustment)
FROM test_numeric_mul_patched
WHERE result - numeric_mul_patch_small(var1,var2,rscale_adjustment) IN
(0.00000002, 0.0002);
var1 | var2 | rscale_adjustment | result |
numeric_mul_patch_small
-------------------+----------------+-------------------+------------+-------------------------
8952.12658563 | 0.902315486665 | -16 | 8077.6425 |
8077.6423
0.881715409579 | 0.843165739371 | -16 | 0.74343223 |
0.74343221
0.905322758954 | 0.756905996850 | -16 | 0.68524423 |
0.68524421
8464.043170546608 | 0.518100129611 | -20 | 4385.2219 |
4385.2217
5253.006296984449 | 0.989308019355 | -20 | 5196.8413 |
5196.8411
(5 rows)
```
What can be said about mul_var()'s contract with regards to rscale?
It's the number of decimal digits requested by the caller, and if not
requesting full precision, then the decimal digits might not be accurate,
but can something be said about how far off they can be?
The mul_var_int() patch only produces a difference that is exactly
1 less than the exact result, at the last non-zero decimal digit.
Could the difference be more than 1 at the last non-zero digit,
like in the five cases found above?
It would be nice if we could define mul_var()'s contract with regards to
rscale, in terms of what precision can be expected in the result.
Attaching the hacked together version with all the patches, used to do the
testing above.
Regards,
Joel
diff --git a/src/backend/utils/adt/numeric.c b/src/backend/utils/adt/numeric.c
index 5510a203b0..63de5cd994 100644
--- a/src/backend/utils/adt/numeric.c
+++ b/src/backend/utils/adt/numeric.c
@@ -551,6 +551,23 @@ static void sub_var(const NumericVar *var1, const
NumericVar *var2,
static void mul_var(const NumericVar *var1, const NumericVar *var2,
NumericVar *result,
int rscale);
+static void mul_var_head(const NumericVar *var1, const NumericVar *var2,
+ NumericVar *result,
+ int rscale);
+static void mul_var_patch_inline(const NumericVar *var1, const NumericVar
*var2,
+ NumericVar *result,
+ int rscale);
+static void mul_var_patch_small(const NumericVar *var1, const NumericVar *var2,
+ NumericVar *result,
+ int rscale);
+static void mul_var_patch_int(const NumericVar *var1, const NumericVar *var2,
+ NumericVar *result,
+ int rscale);
+static void mul_var_small(const NumericVar *var1, const NumericVar *var2,
+ NumericVar *result, int
rscale);
+
+static void mul_var_int(const NumericVar *var, int ival, int ival_weight,
+ NumericVar *result, int rscale);
static void div_var(const NumericVar *var1, const NumericVar *var2,
NumericVar *result,
int rscale, bool round);
@@ -3115,6 +3132,448 @@ numeric_mul_opt_error(Numeric num1, Numeric num2, bool
*have_error)
}
+Datum
+numeric_mul_head(PG_FUNCTION_ARGS)
+{
+ Numeric num1 = PG_GETARG_NUMERIC(0);
+ Numeric num2 = PG_GETARG_NUMERIC(1);
+ int32 rscale_adjustment = PG_GETARG_INT32(2);
+ Numeric res;
+
+ res = numeric_mul_head_opt_error(num1, num2, rscale_adjustment, NULL);
+
+ PG_RETURN_NUMERIC(res);
+}
+
+
+Datum
+numeric_mul_patch_inline(PG_FUNCTION_ARGS)
+{
+ Numeric num1 = PG_GETARG_NUMERIC(0);
+ Numeric num2 = PG_GETARG_NUMERIC(1);
+ int32 rscale_adjustment = PG_GETARG_INT32(2);
+ Numeric res;
+
+ res = numeric_mul_patch_inline_opt_error(num1, num2, rscale_adjustment,
NULL);
+
+ PG_RETURN_NUMERIC(res);
+}
+
+Datum
+numeric_mul_patch_small(PG_FUNCTION_ARGS)
+{
+ Numeric num1 = PG_GETARG_NUMERIC(0);
+ Numeric num2 = PG_GETARG_NUMERIC(1);
+ int32 rscale_adjustment = PG_GETARG_INT32(2);
+ Numeric res;
+
+ res = numeric_mul_patch_small_opt_error(num1, num2, rscale_adjustment,
NULL);
+
+ PG_RETURN_NUMERIC(res);
+}
+
+Datum
+numeric_mul_patch_int(PG_FUNCTION_ARGS)
+{
+ Numeric num1 = PG_GETARG_NUMERIC(0);
+ Numeric num2 = PG_GETARG_NUMERIC(1);
+ int32 rscale_adjustment = PG_GETARG_INT32(2);
+ Numeric res;
+
+ res = numeric_mul_patch_int_opt_error(num1, num2, rscale_adjustment,
NULL);
+
+ PG_RETURN_NUMERIC(res);
+}
+
+
+Numeric
+numeric_mul_patch_inline_opt_error(Numeric num1, Numeric num2, int32
rscale_adjustment, bool *have_error)
+{
+ NumericVar arg1;
+ NumericVar arg2;
+ NumericVar result;
+ Numeric res;
+
+ /*
+ * Handle NaN and infinities
+ */
+ if (NUMERIC_IS_SPECIAL(num1) || NUMERIC_IS_SPECIAL(num2))
+ {
+ if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
+ return make_result(&const_nan);
+ if (NUMERIC_IS_PINF(num1))
+ {
+ switch (numeric_sign_internal(num2))
+ {
+ case 0:
+ return make_result(&const_nan); /* Inf
* 0 */
+ case 1:
+ return make_result(&const_pinf);
+ case -1:
+ return make_result(&const_ninf);
+ }
+ Assert(false);
+ }
+ if (NUMERIC_IS_NINF(num1))
+ {
+ switch (numeric_sign_internal(num2))
+ {
+ case 0:
+ return make_result(&const_nan); /* -Inf
* 0 */
+ case 1:
+ return make_result(&const_ninf);
+ case -1:
+ return make_result(&const_pinf);
+ }
+ Assert(false);
+ }
+ /* by here, num1 must be finite, so num2 is not */
+ if (NUMERIC_IS_PINF(num2))
+ {
+ switch (numeric_sign_internal(num1))
+ {
+ case 0:
+ return make_result(&const_nan); /* 0 *
Inf */
+ case 1:
+ return make_result(&const_pinf);
+ case -1:
+ return make_result(&const_ninf);
+ }
+ Assert(false);
+ }
+ Assert(NUMERIC_IS_NINF(num2));
+ switch (numeric_sign_internal(num1))
+ {
+ case 0:
+ return make_result(&const_nan); /* 0 * -Inf */
+ case 1:
+ return make_result(&const_ninf);
+ case -1:
+ return make_result(&const_pinf);
+ }
+ Assert(false);
+ }
+
+ /*
+ * Unpack the values, let mul_var() compute the result and return it.
+ * Unlike add_var() and sub_var(), mul_var() will round its result. In
the
+ * case of numeric_mul(), which is invoked for the * operator on
numerics,
+ * we request exact representation for the product (rscale = sum(dscale
of
+ * arg1, dscale of arg2)). If the exact result has more digits after
the
+ * decimal point than can be stored in a numeric, we round it. Rounding
+ * after computing the exact result ensures that the final result is
+ * correctly rounded (rounding in mul_var() using a truncated product
+ * would not guarantee this).
+ */
+ init_var_from_num(num1, &arg1);
+ init_var_from_num(num2, &arg2);
+
+ init_var(&result);
+
+ mul_var_patch_inline(&arg1, &arg2, &result, arg1.dscale + arg2.dscale +
rscale_adjustment);
+
+ if (result.dscale > NUMERIC_DSCALE_MAX)
+ round_var(&result, NUMERIC_DSCALE_MAX);
+
+ res = make_result_opt_error(&result, have_error);
+
+ free_var(&result);
+
+ return res;
+}
+
+
+Numeric
+numeric_mul_head_opt_error(Numeric num1, Numeric num2, int32
rscale_adjustment, bool *have_error)
+{
+ NumericVar arg1;
+ NumericVar arg2;
+ NumericVar result;
+ Numeric res;
+
+ /*
+ * Handle NaN and infinities
+ */
+ if (NUMERIC_IS_SPECIAL(num1) || NUMERIC_IS_SPECIAL(num2))
+ {
+ if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
+ return make_result(&const_nan);
+ if (NUMERIC_IS_PINF(num1))
+ {
+ switch (numeric_sign_internal(num2))
+ {
+ case 0:
+ return make_result(&const_nan); /* Inf
* 0 */
+ case 1:
+ return make_result(&const_pinf);
+ case -1:
+ return make_result(&const_ninf);
+ }
+ Assert(false);
+ }
+ if (NUMERIC_IS_NINF(num1))
+ {
+ switch (numeric_sign_internal(num2))
+ {
+ case 0:
+ return make_result(&const_nan); /* -Inf
* 0 */
+ case 1:
+ return make_result(&const_ninf);
+ case -1:
+ return make_result(&const_pinf);
+ }
+ Assert(false);
+ }
+ /* by here, num1 must be finite, so num2 is not */
+ if (NUMERIC_IS_PINF(num2))
+ {
+ switch (numeric_sign_internal(num1))
+ {
+ case 0:
+ return make_result(&const_nan); /* 0 *
Inf */
+ case 1:
+ return make_result(&const_pinf);
+ case -1:
+ return make_result(&const_ninf);
+ }
+ Assert(false);
+ }
+ Assert(NUMERIC_IS_NINF(num2));
+ switch (numeric_sign_internal(num1))
+ {
+ case 0:
+ return make_result(&const_nan); /* 0 * -Inf */
+ case 1:
+ return make_result(&const_ninf);
+ case -1:
+ return make_result(&const_pinf);
+ }
+ Assert(false);
+ }
+
+ /*
+ * Unpack the values, let mul_var() compute the result and return it.
+ * Unlike add_var() and sub_var(), mul_var() will round its result. In
the
+ * case of numeric_mul(), which is invoked for the * operator on
numerics,
+ * we request exact representation for the product (rscale = sum(dscale
of
+ * arg1, dscale of arg2)). If the exact result has more digits after
the
+ * decimal point than can be stored in a numeric, we round it. Rounding
+ * after computing the exact result ensures that the final result is
+ * correctly rounded (rounding in mul_var() using a truncated product
+ * would not guarantee this).
+ */
+ init_var_from_num(num1, &arg1);
+ init_var_from_num(num2, &arg2);
+
+ init_var(&result);
+
+ mul_var_head(&arg1, &arg2, &result, arg1.dscale + arg2.dscale +
rscale_adjustment);
+
+ if (result.dscale > NUMERIC_DSCALE_MAX)
+ round_var(&result, NUMERIC_DSCALE_MAX);
+
+ res = make_result_opt_error(&result, have_error);
+
+ free_var(&result);
+
+ return res;
+}
+
+
+Numeric
+numeric_mul_patch_small_opt_error(Numeric num1, Numeric num2, int32
rscale_adjustment, bool *have_error)
+{
+ NumericVar arg1;
+ NumericVar arg2;
+ NumericVar result;
+ Numeric res;
+
+ /*
+ * Handle NaN and infinities
+ */
+ if (NUMERIC_IS_SPECIAL(num1) || NUMERIC_IS_SPECIAL(num2))
+ {
+ if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
+ return make_result(&const_nan);
+ if (NUMERIC_IS_PINF(num1))
+ {
+ switch (numeric_sign_internal(num2))
+ {
+ case 0:
+ return make_result(&const_nan); /* Inf
* 0 */
+ case 1:
+ return make_result(&const_pinf);
+ case -1:
+ return make_result(&const_ninf);
+ }
+ Assert(false);
+ }
+ if (NUMERIC_IS_NINF(num1))
+ {
+ switch (numeric_sign_internal(num2))
+ {
+ case 0:
+ return make_result(&const_nan); /* -Inf
* 0 */
+ case 1:
+ return make_result(&const_ninf);
+ case -1:
+ return make_result(&const_pinf);
+ }
+ Assert(false);
+ }
+ /* by here, num1 must be finite, so num2 is not */
+ if (NUMERIC_IS_PINF(num2))
+ {
+ switch (numeric_sign_internal(num1))
+ {
+ case 0:
+ return make_result(&const_nan); /* 0 *
Inf */
+ case 1:
+ return make_result(&const_pinf);
+ case -1:
+ return make_result(&const_ninf);
+ }
+ Assert(false);
+ }
+ Assert(NUMERIC_IS_NINF(num2));
+ switch (numeric_sign_internal(num1))
+ {
+ case 0:
+ return make_result(&const_nan); /* 0 * -Inf */
+ case 1:
+ return make_result(&const_ninf);
+ case -1:
+ return make_result(&const_pinf);
+ }
+ Assert(false);
+ }
+
+ /*
+ * Unpack the values, let mul_var() compute the result and return it.
+ * Unlike add_var() and sub_var(), mul_var() will round its result. In
the
+ * case of numeric_mul(), which is invoked for the * operator on
numerics,
+ * we request exact representation for the product (rscale = sum(dscale
of
+ * arg1, dscale of arg2)). If the exact result has more digits after
the
+ * decimal point than can be stored in a numeric, we round it. Rounding
+ * after computing the exact result ensures that the final result is
+ * correctly rounded (rounding in mul_var() using a truncated product
+ * would not guarantee this).
+ */
+ init_var_from_num(num1, &arg1);
+ init_var_from_num(num2, &arg2);
+
+ init_var(&result);
+
+ mul_var_patch_small(&arg1, &arg2, &result, arg1.dscale + arg2.dscale +
rscale_adjustment);
+
+ if (result.dscale > NUMERIC_DSCALE_MAX)
+ round_var(&result, NUMERIC_DSCALE_MAX);
+
+ res = make_result_opt_error(&result, have_error);
+
+ free_var(&result);
+
+ return res;
+}
+
+
+Numeric
+numeric_mul_patch_int_opt_error(Numeric num1, Numeric num2, int32
rscale_adjustment, bool *have_error)
+{
+ NumericVar arg1;
+ NumericVar arg2;
+ NumericVar result;
+ Numeric res;
+
+ /*
+ * Handle NaN and infinities
+ */
+ if (NUMERIC_IS_SPECIAL(num1) || NUMERIC_IS_SPECIAL(num2))
+ {
+ if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
+ return make_result(&const_nan);
+ if (NUMERIC_IS_PINF(num1))
+ {
+ switch (numeric_sign_internal(num2))
+ {
+ case 0:
+ return make_result(&const_nan); /* Inf
* 0 */
+ case 1:
+ return make_result(&const_pinf);
+ case -1:
+ return make_result(&const_ninf);
+ }
+ Assert(false);
+ }
+ if (NUMERIC_IS_NINF(num1))
+ {
+ switch (numeric_sign_internal(num2))
+ {
+ case 0:
+ return make_result(&const_nan); /* -Inf
* 0 */
+ case 1:
+ return make_result(&const_ninf);
+ case -1:
+ return make_result(&const_pinf);
+ }
+ Assert(false);
+ }
+ /* by here, num1 must be finite, so num2 is not */
+ if (NUMERIC_IS_PINF(num2))
+ {
+ switch (numeric_sign_internal(num1))
+ {
+ case 0:
+ return make_result(&const_nan); /* 0 *
Inf */
+ case 1:
+ return make_result(&const_pinf);
+ case -1:
+ return make_result(&const_ninf);
+ }
+ Assert(false);
+ }
+ Assert(NUMERIC_IS_NINF(num2));
+ switch (numeric_sign_internal(num1))
+ {
+ case 0:
+ return make_result(&const_nan); /* 0 * -Inf */
+ case 1:
+ return make_result(&const_ninf);
+ case -1:
+ return make_result(&const_pinf);
+ }
+ Assert(false);
+ }
+
+ /*
+ * Unpack the values, let mul_var() compute the result and return it.
+ * Unlike add_var() and sub_var(), mul_var() will round its result. In
the
+ * case of numeric_mul(), which is invoked for the * operator on
numerics,
+ * we request exact representation for the product (rscale = sum(dscale
of
+ * arg1, dscale of arg2)). If the exact result has more digits after
the
+ * decimal point than can be stored in a numeric, we round it. Rounding
+ * after computing the exact result ensures that the final result is
+ * correctly rounded (rounding in mul_var() using a truncated product
+ * would not guarantee this).
+ */
+ init_var_from_num(num1, &arg1);
+ init_var_from_num(num2, &arg2);
+
+ init_var(&result);
+
+ mul_var_patch_int(&arg1, &arg2, &result, arg1.dscale + arg2.dscale +
rscale_adjustment);
+
+ if (result.dscale > NUMERIC_DSCALE_MAX)
+ round_var(&result, NUMERIC_DSCALE_MAX);
+
+ res = make_result_opt_error(&result, have_error);
+
+ free_var(&result);
+
+ return res;
+}
+
+
/*
* numeric_div() -
*
@@ -8864,6 +9323,1245 @@ mul_var(const NumericVar *var1, const NumericVar
*var2, NumericVar *result,
strip_var(result);
}
+static void
+mul_var_head(const NumericVar *var1, const NumericVar *var2, NumericVar
*result,
+ int rscale)
+{
+ int res_ndigits;
+ int res_sign;
+ int res_weight;
+ int maxdigits;
+ int *dig;
+ int carry;
+ int maxdig;
+ int newdig;
+ int var1ndigits;
+ int var2ndigits;
+ NumericDigit *var1digits;
+ NumericDigit *var2digits;
+ NumericDigit *res_digits;
+ int i,
+ i1,
+ i2;
+
+ /*
+ * Arrange for var1 to be the shorter of the two numbers. This improves
+ * performance because the inner multiplication loop is much simpler
than
+ * the outer loop, so it's better to have a smaller number of iterations
+ * of the outer loop. This also reduces the number of times that the
+ * accumulator array needs to be normalized.
+ */
+ if (var1->ndigits > var2->ndigits)
+ {
+ const NumericVar *tmp = var1;
+
+ var1 = var2;
+ var2 = tmp;
+ }
+
+ /* copy these values into local vars for speed in inner loop */
+ var1ndigits = var1->ndigits;
+ var2ndigits = var2->ndigits;
+ var1digits = var1->digits;
+ var2digits = var2->digits;
+
+ if (var1ndigits == 0 || var2ndigits == 0)
+ {
+ /* one or both inputs is zero; so is result */
+ zero_var(result);
+ result->dscale = rscale;
+ return;
+ }
+
+ /* Determine result sign and (maximum possible) weight */
+ if (var1->sign == var2->sign)
+ res_sign = NUMERIC_POS;
+ else
+ res_sign = NUMERIC_NEG;
+ res_weight = var1->weight + var2->weight + 2;
+
+ /*
+ * Determine the number of result digits to compute. If the exact
result
+ * would have more than rscale fractional digits, truncate the
computation
+ * with MUL_GUARD_DIGITS guard digits, i.e., ignore input digits that
+ * would only contribute to the right of that. (This will give the
exact
+ * rounded-to-rscale answer unless carries out of the ignored positions
+ * would have propagated through more than MUL_GUARD_DIGITS digits.)
+ *
+ * Note: an exact computation could not produce more than var1ndigits +
+ * var2ndigits digits, but we allocate one extra output digit in case
+ * rscale-driven rounding produces a carry out of the highest exact
digit.
+ */
+ res_ndigits = var1ndigits + var2ndigits + 1;
+ maxdigits = res_weight + 1 + (rscale + DEC_DIGITS - 1) / DEC_DIGITS +
+ MUL_GUARD_DIGITS;
+ res_ndigits = Min(res_ndigits, maxdigits);
+
+ if (res_ndigits < 3)
+ {
+ /* All input digits will be ignored; so result is zero */
+ zero_var(result);
+ result->dscale = rscale;
+ return;
+ }
+
+ /*
+ * We do the arithmetic in an array "dig[]" of signed int's. Since
+ * INT_MAX is noticeably larger than NBASE*NBASE, this gives us headroom
+ * to avoid normalizing carries immediately.
+ *
+ * maxdig tracks the maximum possible value of any dig[] entry; when
this
+ * threatens to exceed INT_MAX, we take the time to propagate carries.
+ * Furthermore, we need to ensure that overflow doesn't occur during the
+ * carry propagation passes either. The carry values could be as much
as
+ * INT_MAX/NBASE, so really we must normalize when digits threaten to
+ * exceed INT_MAX - INT_MAX/NBASE.
+ *
+ * To avoid overflow in maxdig itself, it actually represents the max
+ * possible value divided by NBASE-1, ie, at the top of the loop it is
+ * known that no dig[] entry exceeds maxdig * (NBASE-1).
+ */
+ dig = (int *) palloc0(res_ndigits * sizeof(int));
+ maxdig = 0;
+
+ /*
+ * The least significant digits of var1 should be ignored if they don't
+ * contribute directly to the first res_ndigits digits of the result
that
+ * we are computing.
+ *
+ * Digit i1 of var1 and digit i2 of var2 are multiplied and added to
digit
+ * i1+i2+2 of the accumulator array, so we need only consider digits of
+ * var1 for which i1 <= res_ndigits - 3.
+ */
+ for (i1 = Min(var1ndigits - 1, res_ndigits - 3); i1 >= 0; i1--)
+ {
+ NumericDigit var1digit = var1digits[i1];
+
+ if (var1digit == 0)
+ continue;
+
+ /* Time to normalize? */
+ maxdig += var1digit;
+ if (maxdig > (INT_MAX - INT_MAX / NBASE) / (NBASE - 1))
+ {
+ /* Yes, do it */
+ carry = 0;
+ for (i = res_ndigits - 1; i >= 0; i--)
+ {
+ newdig = dig[i] + carry;
+ if (newdig >= NBASE)
+ {
+ carry = newdig / NBASE;
+ newdig -= carry * NBASE;
+ }
+ else
+ carry = 0;
+ dig[i] = newdig;
+ }
+ Assert(carry == 0);
+ /* Reset maxdig to indicate new worst-case */
+ maxdig = 1 + var1digit;
+ }
+
+ /*
+ * Add the appropriate multiple of var2 into the accumulator.
+ *
+ * As above, digits of var2 can be ignored if they don't
contribute,
+ * so we only include digits for which i1+i2+2 < res_ndigits.
+ *
+ * This inner loop is the performance bottleneck for
multiplication,
+ * so we want to keep it simple enough so that it can be
+ * auto-vectorized. Accordingly, process the digits
left-to-right
+ * even though schoolbook multiplication would suggest
right-to-left.
+ * Since we aren't propagating carries in this loop, the order
does
+ * not matter.
+ */
+ {
+ int i2limit = Min(var2ndigits,
res_ndigits - i1 - 2);
+ int *dig_i1_2 = &dig[i1 + 2];
+
+ for (i2 = 0; i2 < i2limit; i2++)
+ dig_i1_2[i2] += var1digit * var2digits[i2];
+ }
+ }
+
+ /*
+ * Now we do a final carry propagation pass to normalize the result,
which
+ * we combine with storing the result digits into the output. Note that
+ * this is still done at full precision w/guard digits.
+ */
+ alloc_var(result, res_ndigits);
+ res_digits = result->digits;
+ carry = 0;
+ for (i = res_ndigits - 1; i >= 0; i--)
+ {
+ newdig = dig[i] + carry;
+ if (newdig >= NBASE)
+ {
+ carry = newdig / NBASE;
+ newdig -= carry * NBASE;
+ }
+ else
+ carry = 0;
+ res_digits[i] = newdig;
+ }
+ Assert(carry == 0);
+
+ pfree(dig);
+
+ /*
+ * Finally, round the result to the requested precision.
+ */
+ result->weight = res_weight;
+ result->sign = res_sign;
+
+ /* Round to target rscale (and set result->dscale) */
+ round_var(result, rscale);
+
+ /* Strip leading and trailing zeroes */
+ strip_var(result);
+}
+
+static void
+mul_var_patch_inline(const NumericVar *var1, const NumericVar *var2,
+ NumericVar *result, int rscale)
+{
+ int res_ndigits;
+ int res_sign;
+ int res_weight;
+ int maxdigits;
+ int *dig;
+ int carry;
+ int maxdig;
+ int newdig;
+ int var1ndigits;
+ int var2ndigits;
+ NumericDigit *var1digits;
+ NumericDigit *var2digits;
+ NumericDigit *res_digits;
+ int i,
+ i1,
+ i2;
+
+ /*
+ * Arrange for var1 to be the shorter of the two numbers. This improves
+ * performance because the inner multiplication loop is much simpler
than
+ * the outer loop, so it's better to have a smaller number of iterations
+ * of the outer loop. This also reduces the number of times that the
+ * accumulator array needs to be normalized.
+ */
+ if (var1->ndigits > var2->ndigits)
+ {
+ const NumericVar *tmp = var1;
+
+ var1 = var2;
+ var2 = tmp;
+ }
+
+ /* copy these values into local vars for speed in inner loop */
+ var1ndigits = var1->ndigits;
+ var2ndigits = var2->ndigits;
+ var1digits = var1->digits;
+ var2digits = var2->digits;
+
+ if (var1ndigits == 0 || var2ndigits == 0)
+ {
+ /* one or both inputs is zero; so is result */
+ zero_var(result);
+ result->dscale = rscale;
+ return;
+ }
+
+ /* Determine result sign and (maximum possible) weight */
+ if (var1->sign == var2->sign)
+ res_sign = NUMERIC_POS;
+ else
+ res_sign = NUMERIC_NEG;
+ res_weight = var1->weight + var2->weight + 2;
+
+ /*
+ * Determine the number of result digits to compute. If the exact
result
+ * would have more than rscale fractional digits, truncate the
computation
+ * with MUL_GUARD_DIGITS guard digits, i.e., ignore input digits that
+ * would only contribute to the right of that. (This will give the
exact
+ * rounded-to-rscale answer unless carries out of the ignored positions
+ * would have propagated through more than MUL_GUARD_DIGITS digits.)
+ *
+ * Note: an exact computation could not produce more than var1ndigits +
+ * var2ndigits digits, but we allocate one extra output digit in case
+ * rscale-driven rounding produces a carry out of the highest exact
digit.
+ */
+ res_ndigits = var1ndigits + var2ndigits + 1;
+ maxdigits = res_weight + 1 + (rscale + DEC_DIGITS - 1) / DEC_DIGITS +
+ MUL_GUARD_DIGITS;
+ res_ndigits = Min(res_ndigits, maxdigits);
+
+ if (res_ndigits < 3)
+ {
+ /* All input digits will be ignored; so result is zero */
+ zero_var(result);
+ result->dscale = rscale;
+ return;
+ }
+
+ /*
+ * Simplified fast-path computation, if var1 has just one or two digits.
+ * This is significantly faster, since it avoids allocating a separate
+ * digit array, making multiple passes over var2, and having separate
+ * carry-propagation passes.
+ */
+ if (var1ndigits <= 3)
+ {
+ NumericDigit *res_buf;
+
+ /* Allocate result digit array */
+ res_buf = digitbuf_alloc(res_ndigits);
+ res_buf[0] = 0; /* spare digit for
later rounding */
+ res_digits = res_buf + 1;
+
+ /*
+ * Compute the result digits directly, in one pass, propagating
the
+ * carry up as we go.
+ */
+ switch (var1ndigits)
+ {
+ case 1:
+ carry = 0;
+ for (i = res_ndigits - 3; i >= 0; i--)
+ {
+ newdig = (int) var1digits[0] *
var2digits[i] + carry;
+ res_digits[i + 1] = (NumericDigit)
(newdig % NBASE);
+ carry = newdig / NBASE;
+ }
+ res_digits[0] = (NumericDigit) carry;
+ break;
+
+ case 2:
+ if (res_ndigits - 4 >= 0 && res_ndigits - 4 <
var2ndigits)
+ newdig = (int) var1digits[1] *
var2digits[res_ndigits - 4];
+ else
+ newdig = 0;
+ if (res_ndigits - 3 >= 0 && res_ndigits - 3 <
var2ndigits)
+ newdig += (int) var1digits[0] *
var2digits[res_ndigits - 3];
+ res_digits[res_ndigits - 2] = (NumericDigit)
(newdig % NBASE);
+ carry = newdig / NBASE;
+ for (i = res_ndigits - 4; i >= 1; i--)
+ {
+ newdig = (int) var1digits[0] *
var2digits[i] +
+ (int) var1digits[1] *
var2digits[i - 1] + carry;
+ res_digits[i + 1] = (NumericDigit)
(newdig % NBASE);
+ carry = newdig / NBASE;
+ }
+ newdig = (int) var1digits[0] * var2digits[0] +
carry;
+ res_digits[1] = (NumericDigit) (newdig % NBASE);
+ res_digits[0] = (NumericDigit) (newdig / NBASE);
+ break;
+
+ case 3:
+ if (res_ndigits - 5 >= 0 && res_ndigits - 5 <
var2ndigits)
+ newdig = (int) var1digits[2] *
var2digits[res_ndigits - 5];
+ else
+ newdig = 0;
+ if (res_ndigits - 4 >= 0 && res_ndigits - 4 <
var2ndigits)
+ newdig += (int) var1digits[1] *
var2digits[res_ndigits - 4];
+ if (res_ndigits - 3 >= 0 && res_ndigits - 3 <
var2ndigits)
+ newdig += (int) var1digits[0] *
var2digits[res_ndigits - 3];
+ res_digits[res_ndigits - 2] = (NumericDigit)
(newdig % NBASE);
+ carry = newdig / NBASE;
+ for (i = res_ndigits - 4; i >= 2; i--)
+ {
+ newdig = carry;
+ if (i < var2ndigits)
+ newdig += (int) var1digits[0] *
var2digits[i];
+ if (i - 1 >= 0 && i - 1 < var2ndigits)
+ newdig += (int) var1digits[1] *
var2digits[i - 1];
+ if (i - 2 >= 0 && i - 2 < var2ndigits)
+ newdig += (int) var1digits[2] *
var2digits[i - 2];
+ res_digits[i + 1] = (NumericDigit)
(newdig % NBASE);
+ carry = newdig / NBASE;
+ }
+ newdig = carry;
+ if (var2ndigits > 1)
+ newdig += (int) var1digits[0] *
var2digits[1];
+ if (var2ndigits > 0)
+ newdig += (int) var1digits[1] *
var2digits[0];
+ res_digits[2] = (NumericDigit) (newdig % NBASE);
+ carry = newdig / NBASE;
+ newdig = (int) var1digits[0] * var2digits[0] +
carry;
+ res_digits[1] = (NumericDigit) (newdig % NBASE);
+ res_digits[0] = (NumericDigit) (newdig / NBASE);
+ break;
+ }
+
+ /* Store the product in result (minus extra rounding digit) */
+ digitbuf_free(result->buf);
+ result->ndigits = res_ndigits - 1;
+ result->buf = res_buf;
+ result->digits = res_digits;
+ result->weight = res_weight - 1;
+ result->sign = res_sign;
+
+ /* Round to target rscale (and set result->dscale) */
+ round_var(result, rscale);
+
+ /* Strip leading and trailing zeroes */
+ strip_var(result);
+
+ return;
+ }
+
+ /*
+ * We do the arithmetic in an array "dig[]" of signed int's. Since
+ * INT_MAX is noticeably larger than NBASE*NBASE, this gives us headroom
+ * to avoid normalizing carries immediately.
+ *
+ * maxdig tracks the maximum possible value of any dig[] entry; when
this
+ * threatens to exceed INT_MAX, we take the time to propagate carries.
+ * Furthermore, we need to ensure that overflow doesn't occur during the
+ * carry propagation passes either. The carry values could be as much
as
+ * INT_MAX/NBASE, so really we must normalize when digits threaten to
+ * exceed INT_MAX - INT_MAX/NBASE.
+ *
+ * To avoid overflow in maxdig itself, it actually represents the max
+ * possible value divided by NBASE-1, ie, at the top of the loop it is
+ * known that no dig[] entry exceeds maxdig * (NBASE-1).
+ */
+ dig = (int *) palloc0(res_ndigits * sizeof(int));
+ maxdig = 0;
+
+ /*
+ * The least significant digits of var1 should be ignored if they don't
+ * contribute directly to the first res_ndigits digits of the result
that
+ * we are computing.
+ *
+ * Digit i1 of var1 and digit i2 of var2 are multiplied and added to
digit
+ * i1+i2+2 of the accumulator array, so we need only consider digits of
+ * var1 for which i1 <= res_ndigits - 3.
+ */
+ for (i1 = Min(var1ndigits - 1, res_ndigits - 3); i1 >= 0; i1--)
+ {
+ NumericDigit var1digit = var1digits[i1];
+
+ if (var1digit == 0)
+ continue;
+
+ /* Time to normalize? */
+ maxdig += var1digit;
+ if (maxdig > (INT_MAX - INT_MAX / NBASE) / (NBASE - 1))
+ {
+ /* Yes, do it */
+ carry = 0;
+ for (i = res_ndigits - 1; i >= 0; i--)
+ {
+ newdig = dig[i] + carry;
+ if (newdig >= NBASE)
+ {
+ carry = newdig / NBASE;
+ newdig -= carry * NBASE;
+ }
+ else
+ carry = 0;
+ dig[i] = newdig;
+ }
+ Assert(carry == 0);
+ /* Reset maxdig to indicate new worst-case */
+ maxdig = 1 + var1digit;
+ }
+
+ /*
+ * Add the appropriate multiple of var2 into the accumulator.
+ *
+ * As above, digits of var2 can be ignored if they don't
contribute,
+ * so we only include digits for which i1+i2+2 < res_ndigits.
+ *
+ * This inner loop is the performance bottleneck for
multiplication,
+ * so we want to keep it simple enough so that it can be
+ * auto-vectorized. Accordingly, process the digits
left-to-right
+ * even though schoolbook multiplication would suggest
right-to-left.
+ * Since we aren't propagating carries in this loop, the order
does
+ * not matter.
+ */
+ {
+ int i2limit = Min(var2ndigits,
res_ndigits - i1 - 2);
+ int *dig_i1_2 = &dig[i1 + 2];
+
+ for (i2 = 0; i2 < i2limit; i2++)
+ {
+ dig_i1_2[i2] += var1digit * var2digits[i2];
+ }
+ }
+ }
+
+ /*
+ * Now we do a final carry propagation pass to normalize the result,
which
+ * we combine with storing the result digits into the output. Note that
+ * this is still done at full precision w/guard digits.
+ */
+ alloc_var(result, res_ndigits);
+ res_digits = result->digits;
+ carry = 0;
+ for (i = res_ndigits - 1; i >= 0; i--)
+ {
+ newdig = dig[i] + carry;
+ if (newdig >= NBASE)
+ {
+ carry = newdig / NBASE;
+ newdig -= carry * NBASE;
+ }
+ else
+ carry = 0;
+ res_digits[i] = newdig;
+ }
+ Assert(carry == 0);
+
+ pfree(dig);
+
+ /*
+ * Finally, round the result to the requested precision.
+ */
+ result->weight = res_weight;
+ result->sign = res_sign;
+
+ /* Round to target rscale (and set result->dscale) */
+ round_var(result, rscale);
+
+ /* Strip leading and trailing zeroes */
+ strip_var(result);
+
+}
+
+
+static void
+mul_var_patch_int(const NumericVar *var1, const NumericVar *var2, NumericVar
*result,
+ int rscale)
+{
+ int res_ndigits;
+ int res_sign;
+ int res_weight;
+ int maxdigits;
+ int *dig;
+ int carry;
+ int maxdig;
+ int newdig;
+ int var1ndigits;
+ int var2ndigits;
+ NumericDigit *var1digits;
+ NumericDigit *var2digits;
+ NumericDigit *res_digits;
+ int i,
+ i1,
+ i2;
+
+ /*
+ * Arrange for var1 to be the shorter of the two numbers. This improves
+ * performance because the inner multiplication loop is much simpler
than
+ * the outer loop, so it's better to have a smaller number of iterations
+ * of the outer loop. This also reduces the number of times that the
+ * accumulator array needs to be normalized.
+ */
+ if (var1->ndigits > var2->ndigits)
+ {
+ const NumericVar *tmp = var1;
+
+ var1 = var2;
+ var2 = tmp;
+ }
+
+ /* copy these values into local vars for speed in inner loop */
+ var1ndigits = var1->ndigits;
+ var2ndigits = var2->ndigits;
+ var1digits = var1->digits;
+ var2digits = var2->digits;
+
+ if (var1ndigits == 0)
+ {
+ /* one or both inputs is zero; so is result */
+ zero_var(result);
+ result->dscale = rscale;
+ return;
+ }
+
+ /*
+ * If var1 has just one or two digits, delegate to mul_var_int(), which
+ * uses a faster direct multiplication algorithm.
+ *
+ * TODO: Similarly, on platforms with 128-bit integers ...
+ */
+ if (var1ndigits <= 2)
+ {
+ int ifactor;
+ int ifactor_weight;
+
+ ifactor = var1->digits[0];
+ ifactor_weight = var1->weight;
+ if (var1ndigits == 2)
+ {
+ ifactor = ifactor * NBASE + var1->digits[1];
+ ifactor_weight--;
+ }
+ if (var1->sign == NUMERIC_NEG)
+ ifactor = -ifactor;
+
+ mul_var_int(var2, ifactor, ifactor_weight, result, rscale);
+ return;
+ }
+
+ /* Determine result sign and (maximum possible) weight */
+ if (var1->sign == var2->sign)
+ res_sign = NUMERIC_POS;
+ else
+ res_sign = NUMERIC_NEG;
+ res_weight = var1->weight + var2->weight + 2;
+
+ /*
+ * Determine the number of result digits to compute. If the exact
result
+ * would have more than rscale fractional digits, truncate the
computation
+ * with MUL_GUARD_DIGITS guard digits, i.e., ignore input digits that
+ * would only contribute to the right of that. (This will give the
exact
+ * rounded-to-rscale answer unless carries out of the ignored positions
+ * would have propagated through more than MUL_GUARD_DIGITS digits.)
+ *
+ * Note: an exact computation could not produce more than var1ndigits +
+ * var2ndigits digits, but we allocate one extra output digit in case
+ * rscale-driven rounding produces a carry out of the highest exact
digit.
+ */
+ res_ndigits = var1ndigits + var2ndigits + 1;
+ maxdigits = res_weight + 1 + (rscale + DEC_DIGITS - 1) / DEC_DIGITS +
+ MUL_GUARD_DIGITS;
+ res_ndigits = Min(res_ndigits, maxdigits);
+
+ if (res_ndigits < 3)
+ {
+ /* All input digits will be ignored; so result is zero */
+ zero_var(result);
+ result->dscale = rscale;
+ return;
+ }
+
+ /*
+ * We do the arithmetic in an array "dig[]" of signed int's. Since
+ * INT_MAX is noticeably larger than NBASE*NBASE, this gives us headroom
+ * to avoid normalizing carries immediately.
+ *
+ * maxdig tracks the maximum possible value of any dig[] entry; when
this
+ * threatens to exceed INT_MAX, we take the time to propagate carries.
+ * Furthermore, we need to ensure that overflow doesn't occur during the
+ * carry propagation passes either. The carry values could be as much
as
+ * INT_MAX/NBASE, so really we must normalize when digits threaten to
+ * exceed INT_MAX - INT_MAX/NBASE.
+ *
+ * To avoid overflow in maxdig itself, it actually represents the max
+ * possible value divided by NBASE-1, ie, at the top of the loop it is
+ * known that no dig[] entry exceeds maxdig * (NBASE-1).
+ */
+ dig = (int *) palloc0(res_ndigits * sizeof(int));
+ maxdig = 0;
+
+ /*
+ * The least significant digits of var1 should be ignored if they don't
+ * contribute directly to the first res_ndigits digits of the result
that
+ * we are computing.
+ *
+ * Digit i1 of var1 and digit i2 of var2 are multiplied and added to
digit
+ * i1+i2+2 of the accumulator array, so we need only consider digits of
+ * var1 for which i1 <= res_ndigits - 3.
+ */
+ for (i1 = Min(var1ndigits - 1, res_ndigits - 3); i1 >= 0; i1--)
+ {
+ NumericDigit var1digit = var1digits[i1];
+
+ if (var1digit == 0)
+ continue;
+
+ /* Time to normalize? */
+ maxdig += var1digit;
+ if (maxdig > (INT_MAX - INT_MAX / NBASE) / (NBASE - 1))
+ {
+ /* Yes, do it */
+ carry = 0;
+ for (i = res_ndigits - 1; i >= 0; i--)
+ {
+ newdig = dig[i] + carry;
+ if (newdig >= NBASE)
+ {
+ carry = newdig / NBASE;
+ newdig -= carry * NBASE;
+ }
+ else
+ carry = 0;
+ dig[i] = newdig;
+ }
+ Assert(carry == 0);
+ /* Reset maxdig to indicate new worst-case */
+ maxdig = 1 + var1digit;
+ }
+
+ /*
+ * Add the appropriate multiple of var2 into the accumulator.
+ *
+ * As above, digits of var2 can be ignored if they don't
contribute,
+ * so we only include digits for which i1+i2+2 < res_ndigits.
+ *
+ * This inner loop is the performance bottleneck for
multiplication,
+ * so we want to keep it simple enough so that it can be
+ * auto-vectorized. Accordingly, process the digits
left-to-right
+ * even though schoolbook multiplication would suggest
right-to-left.
+ * Since we aren't propagating carries in this loop, the order
does
+ * not matter.
+ */
+ {
+ int i2limit = Min(var2ndigits,
res_ndigits - i1 - 2);
+ int *dig_i1_2 = &dig[i1 + 2];
+
+ for (i2 = 0; i2 < i2limit; i2++)
+ dig_i1_2[i2] += var1digit * var2digits[i2];
+ }
+ }
+
+ /*
+ * Now we do a final carry propagation pass to normalize the result,
which
+ * we combine with storing the result digits into the output. Note that
+ * this is still done at full precision w/guard digits.
+ */
+ alloc_var(result, res_ndigits);
+ res_digits = result->digits;
+ carry = 0;
+ for (i = res_ndigits - 1; i >= 0; i--)
+ {
+ newdig = dig[i] + carry;
+ if (newdig >= NBASE)
+ {
+ carry = newdig / NBASE;
+ newdig -= carry * NBASE;
+ }
+ else
+ carry = 0;
+ res_digits[i] = newdig;
+ }
+ Assert(carry == 0);
+
+ pfree(dig);
+
+ /*
+ * Finally, round the result to the requested precision.
+ */
+ result->weight = res_weight;
+ result->sign = res_sign;
+
+ /* Round to target rscale (and set result->dscale) */
+ round_var(result, rscale);
+
+ /* Strip leading and trailing zeroes */
+ strip_var(result);
+}
+
+/*
+ * mul_var_int() -
+ *
+ * Multiply a numeric variable by a 32-bit integer with the specified
weight.
+ * The product var * ival * NBASE^ival_weight is stored in result.
+ */
+static void
+mul_var_int(const NumericVar *var, int ival, int ival_weight,
+ NumericVar *result, int rscale)
+{
+ NumericDigit *var_digits = var->digits;
+ int var_ndigits = var->ndigits;
+ int res_sign;
+ int res_weight;
+ int res_ndigits;
+ int maxdigits;
+ NumericDigit *res_buf;
+ NumericDigit *res_digits;
+ uint32 factor;
+ uint32 carry;
+
+ if (ival == 0 || var_ndigits == 0)
+ {
+ zero_var(result);
+ result->dscale = rscale;
+ return;
+ }
+
+ /*
+ * Determine the result sign, (maximum possible) weight and number of
+ * digits to calculate. The weight figured here is correct if the
emitted
+ * product has no leading zero digits; otherwise strip_var() will fix
+ * things up.
+ */
+ if (var->sign == NUMERIC_POS)
+ res_sign = ival > 0 ? NUMERIC_POS : NUMERIC_NEG;
+ else
+ res_sign = ival > 0 ? NUMERIC_NEG : NUMERIC_POS;
+ res_weight = var->weight + ival_weight + 3;
+ /* The number of accurate result digits we need to produce: */
+ res_ndigits = var_ndigits + 3;
+ maxdigits = res_weight + 1 + (rscale + DEC_DIGITS - 1) / DEC_DIGITS +
+ MUL_GUARD_DIGITS;
+ res_ndigits = Min(res_ndigits, maxdigits);
+
+ if (res_ndigits < 3)
+ {
+ /* All input digits will be ignored; so result is zero */
+ zero_var(result);
+ result->dscale = rscale;
+ return;
+ }
+
+ res_buf = digitbuf_alloc(res_ndigits + 1);
+ res_buf[0] = 0; /* spare digit for later
rounding */
+ res_digits = res_buf + 1;
+
+ /*
+ * Now compute the product digits by procssing the input digits in
reverse
+ * and propagating the carry up as we go.
+ *
+ * In this algorithm, the carry from one digit to the next is at most
+ * factor - 1, and product is at most factor * NBASE - 1, and so it
needs
+ * to be a 64-bit integer if this exceeds UINT_MAX.
+ */
+ factor = abs(ival);
+ carry = 0;
+
+ if (factor <= UINT_MAX / NBASE)
+ {
+ /* product cannot overflow 32 bits */
+ uint32 product;
+
+ for (int i = res_ndigits - 4; i >= 0; i--)
+ {
+ product = factor * var_digits[i] + carry;
+ res_digits[i + 3] = (NumericDigit) (product % NBASE);
+ carry = product / NBASE;
+ }
+ res_digits[2] = (NumericDigit) (carry % NBASE);
+ carry = carry / NBASE;
+ res_digits[1] = (NumericDigit) (carry % NBASE);
+ res_digits[0] = (NumericDigit) (carry / NBASE);
+ }
+ else
+ {
+ /* product may exceed 32 bits */
+ uint64 product;
+
+ for (int i = res_ndigits - 4; i >= 0; i--)
+ {
+ product = (uint64) factor * var_digits[i] + carry;
+ res_digits[i + 3] = (NumericDigit) (product % NBASE);
+ carry = (uint32) (product / NBASE);
+ }
+ res_digits[2] = (NumericDigit) (carry % NBASE);
+ carry = carry / NBASE;
+ res_digits[1] = (NumericDigit) (carry % NBASE);
+ res_digits[0] = (NumericDigit) (carry / NBASE);
+ }
+
+ /* Store the product in result */
+ digitbuf_free(result->buf);
+ result->ndigits = res_ndigits;
+ result->buf = res_buf;
+ result->digits = res_digits;
+ result->weight = res_weight;
+ result->sign = res_sign;
+
+ /* Round to target rscale (and set result->dscale) */
+ round_var(result, rscale);
+
+ /* Strip leading and trailing zeroes */
+ strip_var(result);
+}
+
+
+static void
+mul_var_patch_small(const NumericVar *var1, const NumericVar *var2, NumericVar
*result,
+ int rscale)
+{
+ int res_ndigits;
+ int res_sign;
+ int res_weight;
+ int maxdigits;
+ int *dig;
+ int carry;
+ int maxdig;
+ int newdig;
+ int var1ndigits;
+ int var2ndigits;
+ NumericDigit *var1digits;
+ NumericDigit *var2digits;
+ NumericDigit *res_digits;
+ int i,
+ i1,
+ i2;
+
+ /*
+ * Arrange for var1 to be the shorter of the two numbers. This improves
+ * performance because the inner multiplication loop is much simpler
than
+ * the outer loop, so it's better to have a smaller number of iterations
+ * of the outer loop. This also reduces the number of times that the
+ * accumulator array needs to be normalized.
+ */
+ if (var1->ndigits > var2->ndigits)
+ {
+ const NumericVar *tmp = var1;
+
+ var1 = var2;
+ var2 = tmp;
+ }
+
+ /* copy these values into local vars for speed in inner loop */
+ var1ndigits = var1->ndigits;
+ var2ndigits = var2->ndigits;
+ var1digits = var1->digits;
+ var2digits = var2->digits;
+
+ if (var1ndigits == 0)
+ {
+ /* one or both inputs is zero; so is result */
+ zero_var(result);
+ result->dscale = rscale;
+ return;
+ }
+
+ /*
+ * If var1 has 3 digits or fewer, delegate to mul_var_small() which
uses a
+ * faster short multiplication algorithm.
+ */
+ if (var1ndigits <= 3)
+ {
+ mul_var_small(var1, var2, result, rscale);
+ return;
+ }
+
+ /* Determine result sign and (maximum possible) weight */
+ if (var1->sign == var2->sign)
+ res_sign = NUMERIC_POS;
+ else
+ res_sign = NUMERIC_NEG;
+ res_weight = var1->weight + var2->weight + 2;
+
+ /*
+ * Determine the number of result digits to compute. If the exact
result
+ * would have more than rscale fractional digits, truncate the
computation
+ * with MUL_GUARD_DIGITS guard digits, i.e., ignore input digits that
+ * would only contribute to the right of that. (This will give the
exact
+ * rounded-to-rscale answer unless carries out of the ignored positions
+ * would have propagated through more than MUL_GUARD_DIGITS digits.)
+ *
+ * Note: an exact computation could not produce more than var1ndigits +
+ * var2ndigits digits, but we allocate one extra output digit in case
+ * rscale-driven rounding produces a carry out of the highest exact
digit.
+ */
+ res_ndigits = var1ndigits + var2ndigits + 1;
+ maxdigits = res_weight + 1 + (rscale + DEC_DIGITS - 1) / DEC_DIGITS +
+ MUL_GUARD_DIGITS;
+ res_ndigits = Min(res_ndigits, maxdigits);
+
+ if (res_ndigits < 3)
+ {
+ /* All input digits will be ignored; so result is zero */
+ zero_var(result);
+ result->dscale = rscale;
+ return;
+ }
+
+ /*
+ * We do the arithmetic in an array "dig[]" of signed int's. Since
+ * INT_MAX is noticeably larger than NBASE*NBASE, this gives us headroom
+ * to avoid normalizing carries immediately.
+ *
+ * maxdig tracks the maximum possible value of any dig[] entry; when
this
+ * threatens to exceed INT_MAX, we take the time to propagate carries.
+ * Furthermore, we need to ensure that overflow doesn't occur during the
+ * carry propagation passes either. The carry values could be as much
as
+ * INT_MAX/NBASE, so really we must normalize when digits threaten to
+ * exceed INT_MAX - INT_MAX/NBASE.
+ *
+ * To avoid overflow in maxdig itself, it actually represents the max
+ * possible value divided by NBASE-1, ie, at the top of the loop it is
+ * known that no dig[] entry exceeds maxdig * (NBASE-1).
+ */
+ dig = (int *) palloc0(res_ndigits * sizeof(int));
+ maxdig = 0;
+
+ /*
+ * The least significant digits of var1 should be ignored if they don't
+ * contribute directly to the first res_ndigits digits of the result
that
+ * we are computing.
+ *
+ * Digit i1 of var1 and digit i2 of var2 are multiplied and added to
digit
+ * i1+i2+2 of the accumulator array, so we need only consider digits of
+ * var1 for which i1 <= res_ndigits - 3.
+ */
+ for (i1 = Min(var1ndigits - 1, res_ndigits - 3); i1 >= 0; i1--)
+ {
+ NumericDigit var1digit = var1digits[i1];
+
+ if (var1digit == 0)
+ continue;
+
+ /* Time to normalize? */
+ maxdig += var1digit;
+ if (maxdig > (INT_MAX - INT_MAX / NBASE) / (NBASE - 1))
+ {
+ /* Yes, do it */
+ carry = 0;
+ for (i = res_ndigits - 1; i >= 0; i--)
+ {
+ newdig = dig[i] + carry;
+ if (newdig >= NBASE)
+ {
+ carry = newdig / NBASE;
+ newdig -= carry * NBASE;
+ }
+ else
+ carry = 0;
+ dig[i] = newdig;
+ }
+ Assert(carry == 0);
+ /* Reset maxdig to indicate new worst-case */
+ maxdig = 1 + var1digit;
+ }
+
+ /*
+ * Add the appropriate multiple of var2 into the accumulator.
+ *
+ * As above, digits of var2 can be ignored if they don't
contribute,
+ * so we only include digits for which i1+i2+2 < res_ndigits.
+ *
+ * This inner loop is the performance bottleneck for
multiplication,
+ * so we want to keep it simple enough so that it can be
+ * auto-vectorized. Accordingly, process the digits
left-to-right
+ * even though schoolbook multiplication would suggest
right-to-left.
+ * Since we aren't propagating carries in this loop, the order
does
+ * not matter.
+ */
+ {
+ int i2limit = Min(var2ndigits,
res_ndigits - i1 - 2);
+ int *dig_i1_2 = &dig[i1 + 2];
+
+ for (i2 = 0; i2 < i2limit; i2++)
+ dig_i1_2[i2] += var1digit * var2digits[i2];
+ }
+ }
+
+ /*
+ * Now we do a final carry propagation pass to normalize the result,
which
+ * we combine with storing the result digits into the output. Note that
+ * this is still done at full precision w/guard digits.
+ */
+ alloc_var(result, res_ndigits);
+ res_digits = result->digits;
+ carry = 0;
+ for (i = res_ndigits - 1; i >= 0; i--)
+ {
+ newdig = dig[i] + carry;
+ if (newdig >= NBASE)
+ {
+ carry = newdig / NBASE;
+ newdig -= carry * NBASE;
+ }
+ else
+ carry = 0;
+ res_digits[i] = newdig;
+ }
+ Assert(carry == 0);
+
+ pfree(dig);
+
+ /*
+ * Finally, round the result to the requested precision.
+ */
+ result->weight = res_weight;
+ result->sign = res_sign;
+
+ /* Round to target rscale (and set result->dscale) */
+ round_var(result, rscale);
+
+ /* Strip leading and trailing zeroes */
+ strip_var(result);
+}
+
+
+/*
+ * mul_var_small() -
+ *
+ * This has the same API as mul_var, but it assumes that var1 has no more
+ * than 3 digits and var2 has at least as many digits as var1. For
variables
+ * satisfying these conditions, the product can be computed more quickly
than
+ * the general algorithm used in mul_var.
+ */
+static void
+mul_var_small(const NumericVar *var1, const NumericVar *var2,
+ NumericVar *result, int rscale)
+{
+ int var1ndigits = var1->ndigits;
+ int var2ndigits = var2->ndigits;
+ NumericDigit *var1digits = var1->digits;
+ NumericDigit *var2digits = var2->digits;
+ int res_sign;
+ int res_weight;
+ int res_ndigits;
+ int maxdigits;
+ NumericDigit *res_buf;
+ NumericDigit *res_digits;
+ int carry;
+ int term;
+
+ /* Check preconditions */
+ Assert(var1ndigits <= 3);
+ Assert(var2ndigits >= var1ndigits);
+
+ /* Determine result sign and (maximum possible) weight */
+ if (var1->sign == var2->sign)
+ res_sign = NUMERIC_POS;
+ else
+ res_sign = NUMERIC_NEG;
+ res_weight = var1->weight + var2->weight + 2;
+
+ /* Determine the number of result digits to compute - see mul_var() */
+ res_ndigits = var1ndigits + var2ndigits + 1;
+ maxdigits = res_weight + 1 + (rscale + DEC_DIGITS - 1) / DEC_DIGITS +
+ MUL_GUARD_DIGITS;
+ res_ndigits = Min(res_ndigits, maxdigits);
+
+ if (res_ndigits < 3)
+ {
+ /* All input digits will be ignored; so result is zero */
+ zero_var(result);
+ result->dscale = rscale;
+ return;
+ }
+
+ /* Allocate result digit array */
+ res_buf = digitbuf_alloc(res_ndigits);
+ res_buf[0] = 0; /* spare digit for later
rounding */
+ res_digits = res_buf + 1;
+
+ /*
+ * Compute the result digits in reverse, in one pass, propagating the
+ * carry up as we go.
+ *
+ * This computes res_digits[res_ndigits - 2], ... res_digits[0] by
summing
+ * the products var1digits[i1] * var2digits[i2] for which i1 + i2 + 1 is
+ * the result index.
+ */
+ switch (var1ndigits)
+ {
+ case 1:
+ /* ---------
+ * 1-digit case:
+ * var1ndigits = 1
+ * var2ndigits >= 1
+ * 3 <= res_ndigits <= var2ndigits + 2
+ * ----------
+ */
+ carry = 0;
+ for (int i = res_ndigits - 3; i >= 0; i--)
+ {
+ term = (int) var1digits[0] * var2digits[i] +
carry;
+ res_digits[i + 1] = (NumericDigit) (term %
NBASE);
+ carry = term / NBASE;
+ }
+ res_digits[0] = (NumericDigit) carry;
+ break;
+
+ case 2:
+ /* ---------
+ * 2-digit case:
+ * var1ndigits = 2
+ * var2ndigits >= 2
+ * 3 <= res_ndigits <= var2ndigits + 3
+ * ----------
+ */
+ /* last result digit and carry */
+ term = 0;
+ if (res_ndigits - 3 < var2ndigits)
+ term += (int) var1digits[0] *
var2digits[res_ndigits - 3];
+ if (res_ndigits > 3)
+ term += (int) var1digits[1] *
var2digits[res_ndigits - 4];
+ res_digits[res_ndigits - 2] = (NumericDigit) (term %
NBASE);
+ carry = term / NBASE;
+
+ /* remaining digits, except for the first two */
+ for (int i = res_ndigits - 4; i >= 1; i--)
+ {
+ term = (int) var1digits[0] * var2digits[i] +
+ (int) var1digits[1] * var2digits[i - 1]
+ carry;
+ res_digits[i + 1] = (NumericDigit) (term %
NBASE);
+ carry = term / NBASE;
+ }
+
+ /* first two digits */
+ term = (int) var1digits[0] * var2digits[0] + carry;
+ res_digits[1] = (NumericDigit) (term % NBASE);
+ res_digits[0] = (NumericDigit) (term / NBASE);
+ break;
+
+ case 3:
+ /* ---------
+ * 3-digit case:
+ * var1ndigits = 3
+ * var2ndigits >= 3
+ * 3 <= res_ndigits <= var2ndigits + 4
+ * ----------
+ */
+ /* last result digit and carry */
+ term = 0;
+ if (res_ndigits - 3 < var2ndigits)
+ term += (int) var1digits[0] *
var2digits[res_ndigits - 3];
+ if (res_ndigits > 3 && res_ndigits - 4 < var2ndigits)
+ term += (int) var1digits[1] *
var2digits[res_ndigits - 4];
+ if (res_ndigits > 4)
+ term += (int) var1digits[2] *
var2digits[res_ndigits - 5];
+ res_digits[res_ndigits - 2] = (NumericDigit) (term %
NBASE);
+ carry = term / NBASE;
+
+ /* penultimate result digit */
+ term = carry;
+ if (res_ndigits > 3 && res_ndigits - 4 < var2ndigits)
+ term += (int) var1digits[0] *
var2digits[res_ndigits - 4];
+ if (res_ndigits > 4)
+ term += (int) var1digits[1] *
var2digits[res_ndigits - 5];
+ if (res_ndigits > 5)
+ term += (int) var1digits[2] *
var2digits[res_ndigits - 6];
+ res_digits[res_ndigits - 3] = (NumericDigit) (term %
NBASE);
+ carry = term / NBASE;
+
+ /* remaining digits, except for the first three */
+ for (int i = res_ndigits - 5; i >= 2; i--)
+ {
+ term = (int) var1digits[0] * var2digits[i] +
+ (int) var1digits[1] * var2digits[i - 1]
+
+ (int) var1digits[2] * var2digits[i - 2]
+ carry;
+ res_digits[i + 1] = (NumericDigit) (term %
NBASE);
+ carry = term / NBASE;
+ }
+
+ /* first three digits */
+ term = (int) var1digits[0] * var2digits[1] +
+ (int) var1digits[1] * var2digits[0] + carry;
+ res_digits[2] = (NumericDigit) (term % NBASE);
+ carry = term / NBASE;
+ term = (int) var1digits[0] * var2digits[0] + carry;
+ res_digits[1] = (NumericDigit) (term % NBASE);
+ res_digits[0] = (NumericDigit) (term / NBASE);
+ break;
+ }
+
+ /* Store the product in result (minus extra rounding digit) */
+ digitbuf_free(result->buf);
+ result->ndigits = res_ndigits - 1;
+ result->buf = res_buf;
+ result->digits = res_digits;
+ result->weight = res_weight - 1;
+ result->sign = res_sign;
+
+ /* Round to target rscale (and set result->dscale) */
+ round_var(result, rscale);
+
+ /* Strip leading and trailing zeroes */
+ strip_var(result);
+}
/*
* div_var() -
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index d4ac578ae6..c85ed20a99 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -4465,6 +4465,18 @@
{ oid => '1726',
proname => 'numeric_mul', prorettype => 'numeric',
proargtypes => 'numeric numeric', prosrc => 'numeric_mul' },
+{ oid => '6347',
+ proname => 'numeric_mul_head', prorettype => 'numeric',
+ proargtypes => 'numeric numeric int4', prosrc => 'numeric_mul_head' },
+{ oid => '6348',
+ proname => 'numeric_mul_patch_inline', prorettype => 'numeric',
+ proargtypes => 'numeric numeric int4', prosrc => 'numeric_mul_patch_inline'
},
+{ oid => '6349',
+ proname => 'numeric_mul_patch_int', prorettype => 'numeric',
+ proargtypes => 'numeric numeric int4', prosrc => 'numeric_mul_patch_int' },
+{ oid => '6350',
+ proname => 'numeric_mul_patch_small', prorettype => 'numeric',
+ proargtypes => 'numeric numeric int4', prosrc => 'numeric_mul_patch_small' },
{ oid => '1727',
proname => 'numeric_div', prorettype => 'numeric',
proargtypes => 'numeric numeric', prosrc => 'numeric_div' },
diff --git a/src/include/utils/numeric.h b/src/include/utils/numeric.h
index 43c75c436f..2bc400c741 100644
--- a/src/include/utils/numeric.h
+++ b/src/include/utils/numeric.h
@@ -97,6 +97,14 @@ extern Numeric numeric_sub_opt_error(Numeric num1, Numeric
num2,
bool
*have_error);
extern Numeric numeric_mul_opt_error(Numeric num1, Numeric num2,
bool
*have_error);
+extern Numeric numeric_mul_head_opt_error(Numeric num1, Numeric num2,
+ int32
rscale_adjustment, bool *have_error);
+extern Numeric numeric_mul_patch_inline_opt_error(Numeric num1, Numeric num2,
+ int32
rscale_adjustment, bool *have_error);
+extern Numeric numeric_mul_patch_int_opt_error(Numeric num1, Numeric num2,
+ int32
rscale_adjustment, bool *have_error);
+extern Numeric numeric_mul_patch_small_opt_error(Numeric num1, Numeric num2,
+ int32
rscale_adjustment, bool *have_error);
extern Numeric numeric_div_opt_error(Numeric num1, Numeric num2,
bool
*have_error);
extern Numeric numeric_mod_opt_error(Numeric num1, Numeric num2,
diff --git a/test-mul-var.sql b/test-mul-var.sql
new file mode 100644
index 0000000000..cf7c55a089
--- /dev/null
+++ b/test-mul-var.sql
@@ -0,0 +1,50 @@
+CREATE TABLE test_numeric_mul_patched (
+ var1 numeric,
+ var2 numeric,
+ rscale_adjustment int,
+ result numeric
+);
+
+DO $$
+DECLARE
+var1 numeric;
+var2 numeric;
+BEGIN
+ FOR i IN 1..1000 LOOP
+ RAISE NOTICE '%', i;
+ FOR var1ndigits IN 1..4 LOOP
+ FOR var2ndigits IN 1..4 LOOP
+ FOR var1dscale IN 0..(var1ndigits*4) LOOP
+ FOR var2dscale IN 0..(var2ndigits*4) LOOP
+ FOR rscale_adjustment IN 0..(var1dscale+var2dscale) LOOP
+ var1 := round(random(
+ format('1%s',repeat('0',(var1ndigits-1)*4-1))::numeric,
+ format('%s',repeat('9',var1ndigits*4))::numeric
+ ) / 10::numeric^var1dscale, var1dscale);
+ var2 := round(random(
+ format('1%s',repeat('0',(var2ndigits-1)*4-1))::numeric,
+ format('%s',repeat('9',var2ndigits*4))::numeric
+ ) / 10::numeric^var2dscale, var2dscale);
+ INSERT INTO test_numeric_mul_patched
+ (var1, var2, rscale_adjustment)
+ VALUES
+ (var1, var2, -rscale_adjustment);
+ END LOOP;
+ END LOOP;
+ END LOOP;
+ END LOOP;
+ END LOOP;
+ END LOOP;
+END $$;
+
+UPDATE test_numeric_mul_patched SET result = numeric_mul_head(var1, var2,
rscale_adjustment);
+
+SELECT
+ rscale_adjustment,
+ COUNT(*),
+ COUNT(*) FILTER (WHERE result IS DISTINCT FROM
numeric_mul_patch_int(var1,var2,rscale_adjustment)) AS "mul_var_int",
+ COUNT(*) FILTER (WHERE result IS DISTINCT FROM
numeric_mul_patch_small(var1,var2,rscale_adjustment)) AS "mul_var_small",
+ COUNT(*) FILTER (WHERE result IS DISTINCT FROM
numeric_mul_patch_inline(var1,var2,rscale_adjustment)) AS "mul_var inlined"
+FROM test_numeric_mul_patched
+GROUP BY 1
+ORDER BY 1;