On 2020/05/08 12:10, Kyotaro Horiguchi wrote:
At Fri, 8 May 2020 11:31:42 +0900, Fujii Masao <masao.fu...@oss.nttdata.com> 
wrote in
You mean that pg_lsn_pli() and pg_lsn_mii() should emit an error like
"the number of bytes to add/subtract cannnot be NaN" when NaN is
specified?
The function is called while executing an expression, so "NaN cannot
be used in this expression" or something like that would work.

This sounds ambiguous. I like to use clearer messages like

cannot add NaN to pg_lsn
cannot subtract NaN from pg_lsn

They works fine to me.

Ok, I updated pg_lsn_pli() and pg_lsn_mii() so that they emit an error
when NaN is specified as the number of bytes.


I'm not sure if int128 is available in every environments.
In second thought, I found that we don't have enough substitute
functions for the platforms without a native implement.  Instead,
there are some overflow-safe uint64 math functions, that is,
pg_add/sub_u64_overflow. This patch defines numeric_pg_lsn which is
substantially numeric_uint64.  By using them, for example, we can make
pg_lsn_pli mainly with integer arithmetic as follows.

Sorry, I'm not sure what the benefit of this approach...

(If we don't allow negative nbytes,)
We accept numeric so that the operators can accept values out of range
of int64, but we don't need to perform all arithmetic in numeric. That
approach does less numeric arithmetic, that is, faster and simpler.
We don't need to string'ify LSN with it. That avoid stack consumption.

If invalid values are given as the addend, the following message would
make sense.
=# select '1/1::pg_lsn + 'NaN'::numeric;
ERROR:  cannot use NaN in this expression
=# select '1/1::pg_lsn + '-1'::numeric;
ERROR:  numeric value out of range for this expression

Could you tell me why we should reject this calculation?
IMO it's ok to add the negative number, and which is possible
with the latest patch.

Sorry, I misread the patch as it rejected -1 for *nbytes*, by seeing
numeric_pg_lsn.

Finally, I'm convinced that we lack required integer arithmetic
infrastructure to perform the objective.

The patch looks good to me except the size of buf[], but I don't
strongly object to that.

Ok, I changed the size of buf[] to 32.
Attached is the updated version of the patch.

Regards,

--
Fujii Masao
Advanced Computing Technology Center
Research and Development Headquarters
NTT DATA CORPORATION
diff --git a/doc/src/sgml/datatype.sgml b/doc/src/sgml/datatype.sgml
index a8d0780387..a86b794ce0 100644
--- a/doc/src/sgml/datatype.sgml
+++ b/doc/src/sgml/datatype.sgml
@@ -4823,7 +4823,13 @@ SELECT * FROM pg_attribute
     standard comparison operators, like <literal>=</literal> and
     <literal>&gt;</literal>.  Two LSNs can be subtracted using the
     <literal>-</literal> operator; the result is the number of bytes separating
-    those write-ahead log locations.
+    those write-ahead log locations.  Also the number of bytes can be
+    added into and subtracted from LSN using the
+    <literal>+(pg_lsn,numeric)</literal> and
+    <literal>-(pg_lsn,numeric)</literal> operators, respectively. Note that
+    the calculated LSN should be in the range of <type>pg_lsn</type> type,
+    i.e., between <literal>0/0</literal> and
+    <literal>FFFFFFFF/FFFFFFFF</literal>.
    </para>
   </sect1>
 
diff --git a/src/backend/utils/adt/numeric.c b/src/backend/utils/adt/numeric.c
index 9986132b45..94593c7f63 100644
--- a/src/backend/utils/adt/numeric.c
+++ b/src/backend/utils/adt/numeric.c
@@ -41,6 +41,7 @@
 #include "utils/guc.h"
 #include "utils/int8.h"
 #include "utils/numeric.h"
+#include "utils/pg_lsn.h"
 #include "utils/sortsupport.h"
 
 /* ----------
@@ -472,6 +473,7 @@ static void apply_typmod(NumericVar *var, int32 typmod);
 static bool numericvar_to_int32(const NumericVar *var, int32 *result);
 static bool numericvar_to_int64(const NumericVar *var, int64 *result);
 static void int64_to_numericvar(int64 val, NumericVar *var);
+static bool numericvar_to_uint64(const NumericVar *var, uint64 *result);
 #ifdef HAVE_INT128
 static bool numericvar_to_int128(const NumericVar *var, int128 *result);
 static void int128_to_numericvar(int128 val, NumericVar *var);
@@ -3688,6 +3690,30 @@ numeric_float4(PG_FUNCTION_ARGS)
 }
 
 
+Datum
+numeric_pg_lsn(PG_FUNCTION_ARGS)
+{
+       Numeric         num = PG_GETARG_NUMERIC(0);
+       NumericVar      x;
+       XLogRecPtr      result;
+
+       if (NUMERIC_IS_NAN(num))
+               ereport(ERROR,
+                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                errmsg("cannot convert NaN to pg_lsn")));
+
+       /* Convert to variable format and thence to pg_lsn */
+       init_var_from_num(num, &x);
+
+       if (!numericvar_to_uint64(&x, (uint64 *) &result))
+               ereport(ERROR,
+                               (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                                errmsg("pg_lsn out of range")));
+
+       PG_RETURN_LSN(result);
+}
+
+
 /* ----------------------------------------------------------------------
  *
  * Aggregate functions
@@ -6739,6 +6765,78 @@ int64_to_numericvar(int64 val, NumericVar *var)
        var->weight = ndigits - 1;
 }
 
+/*
+ * Convert numeric to uint64, rounding if needed.
+ *
+ * If overflow, return false (no error is raised).  Return true if okay.
+ */
+static bool
+numericvar_to_uint64(const NumericVar *var, uint64 *result)
+{
+       NumericDigit *digits;
+       int                     ndigits;
+       int                     weight;
+       int                     i;
+       uint64          val;
+       NumericVar      rounded;
+
+       /* Round to nearest integer */
+       init_var(&rounded);
+       set_var_from_var(var, &rounded);
+       round_var(&rounded, 0);
+
+       /* Check for zero input */
+       strip_var(&rounded);
+       ndigits = rounded.ndigits;
+       if (ndigits == 0)
+       {
+               *result = 0;
+               free_var(&rounded);
+               return true;
+       }
+
+       /* Check for negative input */
+       if (rounded.sign == NUMERIC_NEG)
+       {
+               free_var(&rounded);
+               return false;
+       }
+
+       /*
+        * For input like 10000000000, we must treat stripped digits as real. So
+        * the loop assumes there are weight+1 digits before the decimal point.
+        */
+       weight = rounded.weight;
+       Assert(weight >= 0 && ndigits <= weight + 1);
+
+       /* Construct the result */
+       digits = rounded.digits;
+       val = digits[0];
+       for (i = 1; i <= weight; i++)
+       {
+               if (unlikely(pg_mul_u64_overflow(val, NBASE, &val)))
+               {
+                       free_var(&rounded);
+                       return false;
+               }
+
+               if (i < ndigits)
+               {
+                       if (unlikely(pg_add_u64_overflow(val, digits[i], &val)))
+                       {
+                               free_var(&rounded);
+                               return false;
+                       }
+               }
+       }
+
+       free_var(&rounded);
+
+       *result = val;
+
+       return true;
+}
+
 #ifdef HAVE_INT128
 /*
  * Convert numeric to int128, rounding if needed.
diff --git a/src/backend/utils/adt/pg_lsn.c b/src/backend/utils/adt/pg_lsn.c
index d9754a7778..ad0a7bd869 100644
--- a/src/backend/utils/adt/pg_lsn.c
+++ b/src/backend/utils/adt/pg_lsn.c
@@ -16,6 +16,7 @@
 #include "funcapi.h"
 #include "libpq/pqformat.h"
 #include "utils/builtins.h"
+#include "utils/numeric.h"
 #include "utils/pg_lsn.h"
 
 #define MAXPG_LSNLEN                   17
@@ -248,3 +249,71 @@ pg_lsn_mi(PG_FUNCTION_ARGS)
 
        return result;
 }
+
+/*
+ * Add the number of bytes to pg_lsn, giving a new pg_lsn.
+ * Must handle both positive and negative numbers of bytes.
+ */
+Datum
+pg_lsn_pli(PG_FUNCTION_ARGS)
+{
+       XLogRecPtr      lsn = PG_GETARG_LSN(0);
+       Numeric         nbytes = PG_GETARG_NUMERIC(1);
+       Datum           num;
+       Datum           res;
+       char            buf[32];
+
+       if (numeric_is_nan(nbytes))
+               ereport(ERROR,
+                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                errmsg("cannot add NaN to pg_lsn")));
+
+       /* Convert to numeric */
+       snprintf(buf, sizeof(buf), UINT64_FORMAT, lsn);
+       num = DirectFunctionCall3(numeric_in,
+                                                         CStringGetDatum(buf),
+                                                         ObjectIdGetDatum(0),
+                                                         Int32GetDatum(-1));
+
+       /* Add two numerics */
+       res = DirectFunctionCall2(numeric_add,
+                                                         NumericGetDatum(num),
+                                                         
NumericGetDatum(nbytes));
+
+       /* Convert to pg_lsn */
+       return DirectFunctionCall1(numeric_pg_lsn, res);
+}
+
+/*
+ * Subtract the number of bytes from pg_lsn, giving a new pg_lsn.
+ * Must handle both positive and negative numbers of bytes.
+ */
+Datum
+pg_lsn_mii(PG_FUNCTION_ARGS)
+{
+       XLogRecPtr      lsn = PG_GETARG_LSN(0);
+       Numeric         nbytes = PG_GETARG_NUMERIC(1);
+       Datum           num;
+       Datum           res;
+       char            buf[32];
+
+       if (numeric_is_nan(nbytes))
+               ereport(ERROR,
+                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                errmsg("cannot subtract NaN from pg_lsn")));
+
+       /* Convert to numeric */
+       snprintf(buf, sizeof(buf), UINT64_FORMAT, lsn);
+       num = DirectFunctionCall3(numeric_in,
+                                                         CStringGetDatum(buf),
+                                                         ObjectIdGetDatum(0),
+                                                         Int32GetDatum(-1));
+
+       /* Subtract two numerics */
+       res = DirectFunctionCall2(numeric_sub,
+                                                         NumericGetDatum(num),
+                                                         
NumericGetDatum(nbytes));
+
+       /* Convert to pg_lsn */
+       return DirectFunctionCall1(numeric_pg_lsn, res);
+}
diff --git a/src/include/catalog/pg_operator.dat 
b/src/include/catalog/pg_operator.dat
index 00ada7e48f..15dfa1497a 100644
--- a/src/include/catalog/pg_operator.dat
+++ b/src/include/catalog/pg_operator.dat
@@ -2909,6 +2909,17 @@
 { oid => '3228', descr => 'minus',
   oprname => '-', oprleft => 'pg_lsn', oprright => 'pg_lsn',
   oprresult => 'numeric', oprcode => 'pg_lsn_mi' },
+{ oid => '5025', descr => 'add',
+  oprname => '+', oprleft => 'pg_lsn', oprright => 'numeric',
+  oprresult => 'pg_lsn', oprcom => '+(numeric,pg_lsn)',
+  oprcode => 'pg_lsn_pli' },
+{ oid => '5026', descr => 'add',
+  oprname => '+', oprleft => 'numeric', oprright => 'pg_lsn',
+  oprresult => 'pg_lsn', oprcom => '+(pg_lsn,numeric)',
+  oprcode => 'numeric_pl_pg_lsn' },
+{ oid => '5027', descr => 'subtract',
+  oprname => '-', oprleft => 'pg_lsn', oprright => 'numeric',
+  oprresult => 'pg_lsn', oprcode => 'pg_lsn_mii' },
 
 # enum operators
 { oid => '3516', descr => 'equal',
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 4bce3ad8de..0eb94d92e3 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -4398,6 +4398,9 @@
 { oid => '1783', descr => 'convert numeric to int2',
   proname => 'int2', prorettype => 'int2', proargtypes => 'numeric',
   prosrc => 'numeric_int2' },
+{ oid => '6103', descr => 'convert numeric to pg_lsn',
+  proname => 'pg_lsn', prorettype => 'pg_lsn', proargtypes => 'numeric',
+  prosrc => 'numeric_pg_lsn' },
 
 { oid => '3556', descr => 'convert jsonb to boolean',
   proname => 'bool', prorettype => 'bool', proargtypes => 'jsonb',
@@ -8578,6 +8581,15 @@
 { oid => '4188', descr => 'smaller of two',
   proname => 'pg_lsn_smaller', prorettype => 'pg_lsn',
   proargtypes => 'pg_lsn pg_lsn', prosrc => 'pg_lsn_smaller' },
+{ oid => '5022',
+  proname => 'pg_lsn_pli', prorettype => 'pg_lsn',
+  proargtypes => 'pg_lsn numeric', prosrc => 'pg_lsn_pli' },
+{ oid => '5023',
+  proname => 'numeric_pl_pg_lsn', prolang => 'sql', prorettype => 'pg_lsn',
+  proargtypes => 'numeric pg_lsn', prosrc => 'select $2 + $1' },
+{ oid => '5024',
+  proname => 'pg_lsn_mii', prorettype => 'pg_lsn',
+  proargtypes => 'pg_lsn numeric', prosrc => 'pg_lsn_mii' },
 
 # enum related procs
 { oid => '3504', descr => 'I/O',
diff --git a/src/test/regress/expected/numeric.out 
b/src/test/regress/expected/numeric.out
index c7fe63d037..978d38b729 100644
--- a/src/test/regress/expected/numeric.out
+++ b/src/test/regress/expected/numeric.out
@@ -2315,3 +2315,30 @@ FROM (VALUES (0::numeric, 0::numeric),
 
 SELECT lcm(9999 * (10::numeric)^131068 + (10::numeric^131068 - 1), 2); -- 
overflow
 ERROR:  value overflows numeric format
+--
+-- Tests for pg_lsn()
+--
+SELECT pg_lsn(23783416::numeric);
+  pg_lsn   
+-----------
+ 0/16AE7F8
+(1 row)
+
+SELECT pg_lsn(0::numeric);
+ pg_lsn 
+--------
+ 0/0
+(1 row)
+
+SELECT pg_lsn(18446744073709551615::numeric);
+      pg_lsn       
+-------------------
+ FFFFFFFF/FFFFFFFF
+(1 row)
+
+SELECT pg_lsn(-1::numeric);
+ERROR:  pg_lsn out of range
+SELECT pg_lsn(18446744073709551616::numeric);
+ERROR:  pg_lsn out of range
+SELECT pg_lsn('NaN'::numeric);
+ERROR:  cannot convert NaN to pg_lsn
diff --git a/src/test/regress/expected/pg_lsn.out 
b/src/test/regress/expected/pg_lsn.out
index 64d41dfdad..99a748a6a7 100644
--- a/src/test/regress/expected/pg_lsn.out
+++ b/src/test/regress/expected/pg_lsn.out
@@ -71,6 +71,56 @@ SELECT '0/16AE7F8'::pg_lsn - '0/16AE7F7'::pg_lsn;
         1
 (1 row)
 
+SELECT '0/16AE7F7'::pg_lsn + 16::numeric;
+ ?column?  
+-----------
+ 0/16AE807
+(1 row)
+
+SELECT 16::numeric + '0/16AE7F7'::pg_lsn;
+ ?column?  
+-----------
+ 0/16AE807
+(1 row)
+
+SELECT '0/16AE7F7'::pg_lsn - 16::numeric;
+ ?column?  
+-----------
+ 0/16AE7E7
+(1 row)
+
+SELECT 'FFFFFFFF/FFFFFFFE'::pg_lsn + 1::numeric;
+     ?column?      
+-------------------
+ FFFFFFFF/FFFFFFFF
+(1 row)
+
+SELECT 'FFFFFFFF/FFFFFFFE'::pg_lsn + 2::numeric; -- out of range error
+ERROR:  pg_lsn out of range
+SELECT '0/1'::pg_lsn - 1::numeric;
+ ?column? 
+----------
+ 0/0
+(1 row)
+
+SELECT '0/1'::pg_lsn - 2::numeric; -- out of range error
+ERROR:  pg_lsn out of range
+SELECT '0/0'::pg_lsn + ('FFFFFFFF/FFFFFFFF'::pg_lsn - '0/0'::pg_lsn);
+     ?column?      
+-------------------
+ FFFFFFFF/FFFFFFFF
+(1 row)
+
+SELECT 'FFFFFFFF/FFFFFFFF'::pg_lsn - ('FFFFFFFF/FFFFFFFF'::pg_lsn - 
'0/0'::pg_lsn);
+ ?column? 
+----------
+ 0/0
+(1 row)
+
+SELECT '0/16AE7F7'::pg_lsn + 'NaN'::numeric;
+ERROR:  cannot add NaN to pg_lsn
+SELECT '0/16AE7F7'::pg_lsn - 'NaN'::numeric;
+ERROR:  cannot subtract NaN from pg_lsn
 -- Check btree and hash opclasses
 EXPLAIN (COSTS OFF)
 SELECT DISTINCT (i || '/' || j)::pg_lsn f
diff --git a/src/test/regress/sql/numeric.sql b/src/test/regress/sql/numeric.sql
index 41475a9a24..fdfe6fa3c7 100644
--- a/src/test/regress/sql/numeric.sql
+++ b/src/test/regress/sql/numeric.sql
@@ -1111,3 +1111,13 @@ FROM (VALUES (0::numeric, 0::numeric),
              (4232.820::numeric, 132.72000::numeric)) AS v(a, b);
 
 SELECT lcm(9999 * (10::numeric)^131068 + (10::numeric^131068 - 1), 2); -- 
overflow
+
+--
+-- Tests for pg_lsn()
+--
+SELECT pg_lsn(23783416::numeric);
+SELECT pg_lsn(0::numeric);
+SELECT pg_lsn(18446744073709551615::numeric);
+SELECT pg_lsn(-1::numeric);
+SELECT pg_lsn(18446744073709551616::numeric);
+SELECT pg_lsn('NaN'::numeric);
diff --git a/src/test/regress/sql/pg_lsn.sql b/src/test/regress/sql/pg_lsn.sql
index 2c143c82ff..615368ba96 100644
--- a/src/test/regress/sql/pg_lsn.sql
+++ b/src/test/regress/sql/pg_lsn.sql
@@ -27,6 +27,17 @@ SELECT '0/16AE7F7' < '0/16AE7F8'::pg_lsn;
 SELECT '0/16AE7F8' > pg_lsn '0/16AE7F7';
 SELECT '0/16AE7F7'::pg_lsn - '0/16AE7F8'::pg_lsn;
 SELECT '0/16AE7F8'::pg_lsn - '0/16AE7F7'::pg_lsn;
+SELECT '0/16AE7F7'::pg_lsn + 16::numeric;
+SELECT 16::numeric + '0/16AE7F7'::pg_lsn;
+SELECT '0/16AE7F7'::pg_lsn - 16::numeric;
+SELECT 'FFFFFFFF/FFFFFFFE'::pg_lsn + 1::numeric;
+SELECT 'FFFFFFFF/FFFFFFFE'::pg_lsn + 2::numeric; -- out of range error
+SELECT '0/1'::pg_lsn - 1::numeric;
+SELECT '0/1'::pg_lsn - 2::numeric; -- out of range error
+SELECT '0/0'::pg_lsn + ('FFFFFFFF/FFFFFFFF'::pg_lsn - '0/0'::pg_lsn);
+SELECT 'FFFFFFFF/FFFFFFFF'::pg_lsn - ('FFFFFFFF/FFFFFFFF'::pg_lsn - 
'0/0'::pg_lsn);
+SELECT '0/16AE7F7'::pg_lsn + 'NaN'::numeric;
+SELECT '0/16AE7F7'::pg_lsn - 'NaN'::numeric;
 
 -- Check btree and hash opclasses
 EXPLAIN (COSTS OFF)

Reply via email to