tsreaper commented on a change in pull request #17634:
URL: https://github.com/apache/flink/pull/17634#discussion_r742597700



##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeMerging.java
##########
@@ -294,26 +294,42 @@ public static LogicalType findSumAggType(LogicalType 
argType) {
     }
 
     // 
--------------------------------------------------------------------------------------------
+    //
+    // Scale adjustment implementation is inspired to SQLServer's one. In 
particular, when a result
+    // precision is greater than MAX_PRECISION, the corresponding scale is 
reduced to prevent the
+    // integral part of a result from being truncated.
+    //
+    // 
https://docs.microsoft.com/en-us/sql/t-sql/data-types/precision-scale-and-length-transact-sql
+    //
 
-    /**
-     * Scale adjustment implementation is inspired to SQLServer's one. In 
particular, when a result
-     * precision is greater than MAX_PRECISION, the corresponding scale is 
reduced to prevent the
-     * integral part of a result from being truncated.
-     *
-     * 
<p>https://docs.microsoft.com/en-us/sql/t-sql/data-types/precision-scale-and-length-transact-sql
-     */
-    private static DecimalType adjustPrecisionScale(int precision, int scale) {
+    private static DecimalType adjustMulDivPrecisionScale(int precision, int 
scale) {
         if (precision <= DecimalType.MAX_PRECISION) {
             // Adjustment only needed when we exceed max precision
             return new DecimalType(false, precision, scale);
         } else {
-            int digitPart = precision - scale;
-            // If original scale is less than MINIMUM_ADJUSTED_SCALE, use 
original scale value;
-            // otherwise preserve at least MINIMUM_ADJUSTED_SCALE fractional 
digits
-            int minScalePart = Math.min(scale, MINIMUM_ADJUSTED_SCALE);
-            int adjustScale = Math.max(DecimalType.MAX_PRECISION - digitPart, 
minScalePart);
-            return new DecimalType(false, DecimalType.MAX_PRECISION, 
adjustScale);
+            int adjustedScale;
+            if (precision - scale <= 32) {
+                // Resulting scale is min(scale, 38 - (precision - scale)
+                adjustedScale = Math.min(scale, DecimalType.MAX_PRECISION - 
(precision - scale));
+            } else {
+                // Keep at least 6 decimal digits

Review comment:
       at least -> at most

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeMerging.java
##########
@@ -294,26 +294,42 @@ public static LogicalType findSumAggType(LogicalType 
argType) {
     }
 
     // 
--------------------------------------------------------------------------------------------
+    //
+    // Scale adjustment implementation is inspired to SQLServer's one. In 
particular, when a result
+    // precision is greater than MAX_PRECISION, the corresponding scale is 
reduced to prevent the
+    // integral part of a result from being truncated.
+    //
+    // 
https://docs.microsoft.com/en-us/sql/t-sql/data-types/precision-scale-and-length-transact-sql
+    //
 
-    /**
-     * Scale adjustment implementation is inspired to SQLServer's one. In 
particular, when a result
-     * precision is greater than MAX_PRECISION, the corresponding scale is 
reduced to prevent the
-     * integral part of a result from being truncated.
-     *
-     * 
<p>https://docs.microsoft.com/en-us/sql/t-sql/data-types/precision-scale-and-length-transact-sql
-     */
-    private static DecimalType adjustPrecisionScale(int precision, int scale) {
+    private static DecimalType adjustMulDivPrecisionScale(int precision, int 
scale) {
         if (precision <= DecimalType.MAX_PRECISION) {
             // Adjustment only needed when we exceed max precision
             return new DecimalType(false, precision, scale);
         } else {
-            int digitPart = precision - scale;
-            // If original scale is less than MINIMUM_ADJUSTED_SCALE, use 
original scale value;
-            // otherwise preserve at least MINIMUM_ADJUSTED_SCALE fractional 
digits
-            int minScalePart = Math.min(scale, MINIMUM_ADJUSTED_SCALE);
-            int adjustScale = Math.max(DecimalType.MAX_PRECISION - digitPart, 
minScalePart);
-            return new DecimalType(false, DecimalType.MAX_PRECISION, 
adjustScale);
+            int adjustedScale;
+            if (precision - scale <= 32) {

Review comment:
       or maybe just change `MINIMUM_ADJUSTED_SCALE` to `6` as it is only used 
in this method.

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeMerging.java
##########
@@ -294,26 +294,42 @@ public static LogicalType findSumAggType(LogicalType 
argType) {
     }
 
     // 
--------------------------------------------------------------------------------------------
+    //
+    // Scale adjustment implementation is inspired to SQLServer's one. In 
particular, when a result
+    // precision is greater than MAX_PRECISION, the corresponding scale is 
reduced to prevent the
+    // integral part of a result from being truncated.
+    //
+    // 
https://docs.microsoft.com/en-us/sql/t-sql/data-types/precision-scale-and-length-transact-sql
+    //
 
-    /**
-     * Scale adjustment implementation is inspired to SQLServer's one. In 
particular, when a result
-     * precision is greater than MAX_PRECISION, the corresponding scale is 
reduced to prevent the
-     * integral part of a result from being truncated.
-     *
-     * 
<p>https://docs.microsoft.com/en-us/sql/t-sql/data-types/precision-scale-and-length-transact-sql
-     */
-    private static DecimalType adjustPrecisionScale(int precision, int scale) {
+    private static DecimalType adjustMulDivPrecisionScale(int precision, int 
scale) {
         if (precision <= DecimalType.MAX_PRECISION) {
             // Adjustment only needed when we exceed max precision
             return new DecimalType(false, precision, scale);
         } else {
-            int digitPart = precision - scale;
-            // If original scale is less than MINIMUM_ADJUSTED_SCALE, use 
original scale value;
-            // otherwise preserve at least MINIMUM_ADJUSTED_SCALE fractional 
digits
-            int minScalePart = Math.min(scale, MINIMUM_ADJUSTED_SCALE);
-            int adjustScale = Math.max(DecimalType.MAX_PRECISION - digitPart, 
minScalePart);
-            return new DecimalType(false, DecimalType.MAX_PRECISION, 
adjustScale);
+            int adjustedScale;
+            if (precision - scale <= 32) {
+                // Resulting scale is min(scale, 38 - (precision - scale)
+                adjustedScale = Math.min(scale, DecimalType.MAX_PRECISION - 
(precision - scale));
+            } else {
+                // Keep at least 6 decimal digits
+                adjustedScale = Math.min(MINIMUM_ADJUSTED_SCALE, scale);
+            }
+            return new DecimalType(false, DecimalType.MAX_PRECISION, 
adjustedScale);
+        }
+    }
+
+    private static DecimalType adjustAddPrecisionScale(
+            int precision, int scale, int integral1, int integral2) {
+        final int adjustedPrecision = Math.min(precision, 
DecimalType.MAX_PRECISION);
+        final int adjustedScale;
+        final int maxIntegral = Math.max(integral1, integral2);

Review comment:
       Why not `Math.max(integral1, integral2) + 1`? I know this is written in 
the documentation of TSQL but they state that
   
   > In addition and subtraction operations, we need max(p1 - s1, p2 - s2) 
places to store integral part of the decimal number.
   
   which does not take carrying into consideration.

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeMerging.java
##########
@@ -294,26 +294,42 @@ public static LogicalType findSumAggType(LogicalType 
argType) {
     }
 
     // 
--------------------------------------------------------------------------------------------
+    //
+    // Scale adjustment implementation is inspired to SQLServer's one. In 
particular, when a result
+    // precision is greater than MAX_PRECISION, the corresponding scale is 
reduced to prevent the
+    // integral part of a result from being truncated.
+    //
+    // 
https://docs.microsoft.com/en-us/sql/t-sql/data-types/precision-scale-and-length-transact-sql
+    //
 
-    /**
-     * Scale adjustment implementation is inspired to SQLServer's one. In 
particular, when a result
-     * precision is greater than MAX_PRECISION, the corresponding scale is 
reduced to prevent the
-     * integral part of a result from being truncated.
-     *
-     * 
<p>https://docs.microsoft.com/en-us/sql/t-sql/data-types/precision-scale-and-length-transact-sql
-     */
-    private static DecimalType adjustPrecisionScale(int precision, int scale) {
+    private static DecimalType adjustMulDivPrecisionScale(int precision, int 
scale) {
         if (precision <= DecimalType.MAX_PRECISION) {
             // Adjustment only needed when we exceed max precision
             return new DecimalType(false, precision, scale);
         } else {
-            int digitPart = precision - scale;
-            // If original scale is less than MINIMUM_ADJUSTED_SCALE, use 
original scale value;
-            // otherwise preserve at least MINIMUM_ADJUSTED_SCALE fractional 
digits
-            int minScalePart = Math.min(scale, MINIMUM_ADJUSTED_SCALE);
-            int adjustScale = Math.max(DecimalType.MAX_PRECISION - digitPart, 
minScalePart);
-            return new DecimalType(false, DecimalType.MAX_PRECISION, 
adjustScale);
+            int adjustedScale;
+            if (precision - scale <= 32) {

Review comment:
       Give this `32` a name just like `MINIMUM_ADJUSTED_SCALE`?

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeMerging.java
##########
@@ -294,26 +294,42 @@ public static LogicalType findSumAggType(LogicalType 
argType) {
     }
 
     // 
--------------------------------------------------------------------------------------------
+    //
+    // Scale adjustment implementation is inspired to SQLServer's one. In 
particular, when a result
+    // precision is greater than MAX_PRECISION, the corresponding scale is 
reduced to prevent the
+    // integral part of a result from being truncated.
+    //
+    // 
https://docs.microsoft.com/en-us/sql/t-sql/data-types/precision-scale-and-length-transact-sql
+    //
 
-    /**
-     * Scale adjustment implementation is inspired to SQLServer's one. In 
particular, when a result
-     * precision is greater than MAX_PRECISION, the corresponding scale is 
reduced to prevent the
-     * integral part of a result from being truncated.
-     *
-     * 
<p>https://docs.microsoft.com/en-us/sql/t-sql/data-types/precision-scale-and-length-transact-sql
-     */
-    private static DecimalType adjustPrecisionScale(int precision, int scale) {
+    private static DecimalType adjustMulDivPrecisionScale(int precision, int 
scale) {
         if (precision <= DecimalType.MAX_PRECISION) {
             // Adjustment only needed when we exceed max precision
             return new DecimalType(false, precision, scale);
         } else {
-            int digitPart = precision - scale;
-            // If original scale is less than MINIMUM_ADJUSTED_SCALE, use 
original scale value;
-            // otherwise preserve at least MINIMUM_ADJUSTED_SCALE fractional 
digits
-            int minScalePart = Math.min(scale, MINIMUM_ADJUSTED_SCALE);
-            int adjustScale = Math.max(DecimalType.MAX_PRECISION - digitPart, 
minScalePart);
-            return new DecimalType(false, DecimalType.MAX_PRECISION, 
adjustScale);
+            int adjustedScale;
+            if (precision - scale <= 32) {
+                // Resulting scale is min(scale, 38 - (precision - scale)
+                adjustedScale = Math.min(scale, DecimalType.MAX_PRECISION - 
(precision - scale));
+            } else {
+                // Keep at least 6 decimal digits
+                adjustedScale = Math.min(MINIMUM_ADJUSTED_SCALE, scale);
+            }
+            return new DecimalType(false, DecimalType.MAX_PRECISION, 
adjustedScale);
+        }
+    }
+
+    private static DecimalType adjustAddPrecisionScale(
+            int precision, int scale, int integral1, int integral2) {
+        final int adjustedPrecision = Math.min(precision, 
DecimalType.MAX_PRECISION);
+        final int adjustedScale;
+        final int maxIntegral = Math.max(integral1, integral2);
+        if (maxIntegral >= adjustedPrecision - scale) {
+            adjustedScale = adjustedPrecision - maxIntegral;
+        } else {
+            adjustedScale = scale;

Review comment:
       not covered by tests




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to