tsreaper commented on a change in pull request #17634: URL: https://github.com/apache/flink/pull/17634#discussion_r742597700
########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeMerging.java ########## @@ -294,26 +294,42 @@ public static LogicalType findSumAggType(LogicalType argType) { } // -------------------------------------------------------------------------------------------- + // + // Scale adjustment implementation is inspired to SQLServer's one. In particular, when a result + // precision is greater than MAX_PRECISION, the corresponding scale is reduced to prevent the + // integral part of a result from being truncated. + // + // https://docs.microsoft.com/en-us/sql/t-sql/data-types/precision-scale-and-length-transact-sql + // - /** - * Scale adjustment implementation is inspired to SQLServer's one. In particular, when a result - * precision is greater than MAX_PRECISION, the corresponding scale is reduced to prevent the - * integral part of a result from being truncated. - * - * <p>https://docs.microsoft.com/en-us/sql/t-sql/data-types/precision-scale-and-length-transact-sql - */ - private static DecimalType adjustPrecisionScale(int precision, int scale) { + private static DecimalType adjustMulDivPrecisionScale(int precision, int scale) { if (precision <= DecimalType.MAX_PRECISION) { // Adjustment only needed when we exceed max precision return new DecimalType(false, precision, scale); } else { - int digitPart = precision - scale; - // If original scale is less than MINIMUM_ADJUSTED_SCALE, use original scale value; - // otherwise preserve at least MINIMUM_ADJUSTED_SCALE fractional digits - int minScalePart = Math.min(scale, MINIMUM_ADJUSTED_SCALE); - int adjustScale = Math.max(DecimalType.MAX_PRECISION - digitPart, minScalePart); - return new DecimalType(false, DecimalType.MAX_PRECISION, adjustScale); + int adjustedScale; + if (precision - scale <= 32) { + // Resulting scale is min(scale, 38 - (precision - scale) + adjustedScale = Math.min(scale, DecimalType.MAX_PRECISION - (precision - scale)); + } else { + // Keep at least 6 decimal digits Review comment: at least -> at most ########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeMerging.java ########## @@ -294,26 +294,42 @@ public static LogicalType findSumAggType(LogicalType argType) { } // -------------------------------------------------------------------------------------------- + // + // Scale adjustment implementation is inspired to SQLServer's one. In particular, when a result + // precision is greater than MAX_PRECISION, the corresponding scale is reduced to prevent the + // integral part of a result from being truncated. + // + // https://docs.microsoft.com/en-us/sql/t-sql/data-types/precision-scale-and-length-transact-sql + // - /** - * Scale adjustment implementation is inspired to SQLServer's one. In particular, when a result - * precision is greater than MAX_PRECISION, the corresponding scale is reduced to prevent the - * integral part of a result from being truncated. - * - * <p>https://docs.microsoft.com/en-us/sql/t-sql/data-types/precision-scale-and-length-transact-sql - */ - private static DecimalType adjustPrecisionScale(int precision, int scale) { + private static DecimalType adjustMulDivPrecisionScale(int precision, int scale) { if (precision <= DecimalType.MAX_PRECISION) { // Adjustment only needed when we exceed max precision return new DecimalType(false, precision, scale); } else { - int digitPart = precision - scale; - // If original scale is less than MINIMUM_ADJUSTED_SCALE, use original scale value; - // otherwise preserve at least MINIMUM_ADJUSTED_SCALE fractional digits - int minScalePart = Math.min(scale, MINIMUM_ADJUSTED_SCALE); - int adjustScale = Math.max(DecimalType.MAX_PRECISION - digitPart, minScalePart); - return new DecimalType(false, DecimalType.MAX_PRECISION, adjustScale); + int adjustedScale; + if (precision - scale <= 32) { Review comment: or maybe just change `MINIMUM_ADJUSTED_SCALE` to `6` as it is only used in this method. ########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeMerging.java ########## @@ -294,26 +294,42 @@ public static LogicalType findSumAggType(LogicalType argType) { } // -------------------------------------------------------------------------------------------- + // + // Scale adjustment implementation is inspired to SQLServer's one. In particular, when a result + // precision is greater than MAX_PRECISION, the corresponding scale is reduced to prevent the + // integral part of a result from being truncated. + // + // https://docs.microsoft.com/en-us/sql/t-sql/data-types/precision-scale-and-length-transact-sql + // - /** - * Scale adjustment implementation is inspired to SQLServer's one. In particular, when a result - * precision is greater than MAX_PRECISION, the corresponding scale is reduced to prevent the - * integral part of a result from being truncated. - * - * <p>https://docs.microsoft.com/en-us/sql/t-sql/data-types/precision-scale-and-length-transact-sql - */ - private static DecimalType adjustPrecisionScale(int precision, int scale) { + private static DecimalType adjustMulDivPrecisionScale(int precision, int scale) { if (precision <= DecimalType.MAX_PRECISION) { // Adjustment only needed when we exceed max precision return new DecimalType(false, precision, scale); } else { - int digitPart = precision - scale; - // If original scale is less than MINIMUM_ADJUSTED_SCALE, use original scale value; - // otherwise preserve at least MINIMUM_ADJUSTED_SCALE fractional digits - int minScalePart = Math.min(scale, MINIMUM_ADJUSTED_SCALE); - int adjustScale = Math.max(DecimalType.MAX_PRECISION - digitPart, minScalePart); - return new DecimalType(false, DecimalType.MAX_PRECISION, adjustScale); + int adjustedScale; + if (precision - scale <= 32) { + // Resulting scale is min(scale, 38 - (precision - scale) + adjustedScale = Math.min(scale, DecimalType.MAX_PRECISION - (precision - scale)); + } else { + // Keep at least 6 decimal digits + adjustedScale = Math.min(MINIMUM_ADJUSTED_SCALE, scale); + } + return new DecimalType(false, DecimalType.MAX_PRECISION, adjustedScale); + } + } + + private static DecimalType adjustAddPrecisionScale( + int precision, int scale, int integral1, int integral2) { + final int adjustedPrecision = Math.min(precision, DecimalType.MAX_PRECISION); + final int adjustedScale; + final int maxIntegral = Math.max(integral1, integral2); Review comment: Why not `Math.max(integral1, integral2) + 1`? I know this is written in the documentation of TSQL but they state that > In addition and subtraction operations, we need max(p1 - s1, p2 - s2) places to store integral part of the decimal number. which does not take carrying into consideration. ########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeMerging.java ########## @@ -294,26 +294,42 @@ public static LogicalType findSumAggType(LogicalType argType) { } // -------------------------------------------------------------------------------------------- + // + // Scale adjustment implementation is inspired to SQLServer's one. In particular, when a result + // precision is greater than MAX_PRECISION, the corresponding scale is reduced to prevent the + // integral part of a result from being truncated. + // + // https://docs.microsoft.com/en-us/sql/t-sql/data-types/precision-scale-and-length-transact-sql + // - /** - * Scale adjustment implementation is inspired to SQLServer's one. In particular, when a result - * precision is greater than MAX_PRECISION, the corresponding scale is reduced to prevent the - * integral part of a result from being truncated. - * - * <p>https://docs.microsoft.com/en-us/sql/t-sql/data-types/precision-scale-and-length-transact-sql - */ - private static DecimalType adjustPrecisionScale(int precision, int scale) { + private static DecimalType adjustMulDivPrecisionScale(int precision, int scale) { if (precision <= DecimalType.MAX_PRECISION) { // Adjustment only needed when we exceed max precision return new DecimalType(false, precision, scale); } else { - int digitPart = precision - scale; - // If original scale is less than MINIMUM_ADJUSTED_SCALE, use original scale value; - // otherwise preserve at least MINIMUM_ADJUSTED_SCALE fractional digits - int minScalePart = Math.min(scale, MINIMUM_ADJUSTED_SCALE); - int adjustScale = Math.max(DecimalType.MAX_PRECISION - digitPart, minScalePart); - return new DecimalType(false, DecimalType.MAX_PRECISION, adjustScale); + int adjustedScale; + if (precision - scale <= 32) { Review comment: Give this `32` a name just like `MINIMUM_ADJUSTED_SCALE`? ########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeMerging.java ########## @@ -294,26 +294,42 @@ public static LogicalType findSumAggType(LogicalType argType) { } // -------------------------------------------------------------------------------------------- + // + // Scale adjustment implementation is inspired to SQLServer's one. In particular, when a result + // precision is greater than MAX_PRECISION, the corresponding scale is reduced to prevent the + // integral part of a result from being truncated. + // + // https://docs.microsoft.com/en-us/sql/t-sql/data-types/precision-scale-and-length-transact-sql + // - /** - * Scale adjustment implementation is inspired to SQLServer's one. In particular, when a result - * precision is greater than MAX_PRECISION, the corresponding scale is reduced to prevent the - * integral part of a result from being truncated. - * - * <p>https://docs.microsoft.com/en-us/sql/t-sql/data-types/precision-scale-and-length-transact-sql - */ - private static DecimalType adjustPrecisionScale(int precision, int scale) { + private static DecimalType adjustMulDivPrecisionScale(int precision, int scale) { if (precision <= DecimalType.MAX_PRECISION) { // Adjustment only needed when we exceed max precision return new DecimalType(false, precision, scale); } else { - int digitPart = precision - scale; - // If original scale is less than MINIMUM_ADJUSTED_SCALE, use original scale value; - // otherwise preserve at least MINIMUM_ADJUSTED_SCALE fractional digits - int minScalePart = Math.min(scale, MINIMUM_ADJUSTED_SCALE); - int adjustScale = Math.max(DecimalType.MAX_PRECISION - digitPart, minScalePart); - return new DecimalType(false, DecimalType.MAX_PRECISION, adjustScale); + int adjustedScale; + if (precision - scale <= 32) { + // Resulting scale is min(scale, 38 - (precision - scale) + adjustedScale = Math.min(scale, DecimalType.MAX_PRECISION - (precision - scale)); + } else { + // Keep at least 6 decimal digits + adjustedScale = Math.min(MINIMUM_ADJUSTED_SCALE, scale); + } + return new DecimalType(false, DecimalType.MAX_PRECISION, adjustedScale); + } + } + + private static DecimalType adjustAddPrecisionScale( + int precision, int scale, int integral1, int integral2) { + final int adjustedPrecision = Math.min(precision, DecimalType.MAX_PRECISION); + final int adjustedScale; + final int maxIntegral = Math.max(integral1, integral2); + if (maxIntegral >= adjustedPrecision - scale) { + adjustedScale = adjustedPrecision - maxIntegral; + } else { + adjustedScale = scale; Review comment: not covered by tests -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org