lsyldliu commented on code in PR #25109: URL: https://github.com/apache/flink/pull/25109#discussion_r1689382425
########## docs/data/sql_functions.yml: ########## @@ -238,6 +238,16 @@ arithmetic: NUMERIC.hex() STRING.hex() description: Returns a string representation of an integer NUMERIC value or a STRING in hex format. Returns NULL if the argument is NULL. E.g. a numeric 20 leads to "14", a numeric 100 leads to "64", a string "hello,world" leads to "68656C6C6F2C776F726C64". + - sql: CONV(num, fromBase, toBase) + table: num.conv(fromBase, toBase) + description: | + Converts num from fromBase to toBase. + The function supports base 2 to base 36, fromBase in [2, 36], ABS(toBase) in [2,36], otherwise the function returns null. + If toBase is negative, num is interpreted as a signed number, otherwise it is treated as an unsigned number. The result is consistent with this rule. + If num overflows, the function raises an error. + num [<INTEGER_NUMERIC> | <CHARACTER_STRING>], fromBase [<TINYINT> | <SMALLINT> | <INTEGER>], toBase [<TINYINT> | <SMALLINT> | <INTEGER>] Review Comment: About `num`: I think there should tell users the exact type instead of the logic type family. Users don't know it. To `fromBase`, `toBase`: according to the jira issue description, if Integer represents all integer numeric type, BIGINT should also be supported. ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java: ########## @@ -1817,6 +1817,29 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL) .outputTypeStrategy(nullableIfArgs(explicit(DataTypes.STRING()))) .build(); + public static final BuiltInFunctionDefinition CONV = + BuiltInFunctionDefinition.newBuilder() + .name("CONV") + .kind(SCALAR) + .inputTypeStrategy( + sequence( + Arrays.asList("num", "fromBase", "toBase"), + Arrays.asList( + or( + logical(LogicalTypeFamily.INTEGER_NUMERIC), + logical(LogicalTypeFamily.CHARACTER_STRING)), + or( + logical(LogicalTypeRoot.TINYINT), Review Comment: I think we should also support `bigint` type, I see Spark also supports this type. so the code can be optimized as follows ? ``` or(logical(LogicalTypeFamily.INTEGER_NUMERIC)), or(logical(LogicalTypeFamily.INTEGER_NUMERIC))))) ``` ########## docs/data/sql_functions.yml: ########## @@ -238,6 +238,16 @@ arithmetic: NUMERIC.hex() STRING.hex() description: Returns a string representation of an integer NUMERIC value or a STRING in hex format. Returns NULL if the argument is NULL. E.g. a numeric 20 leads to "14", a numeric 100 leads to "64", a string "hello,world" leads to "68656C6C6F2C776F726C64". + - sql: CONV(num, fromBase, toBase) + table: num.conv(fromBase, toBase) + description: | + Converts num from fromBase to toBase. Review Comment: ```suggestion Converts num from fromBase to toBase. ``` ########## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ConvFunction.java: ########## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.binary.BinaryStringData; +import org.apache.flink.table.data.binary.BinaryStringDataUtil; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext; +import org.apache.flink.table.runtime.functions.BaseConversionUtils; + +import javax.annotation.Nullable; + +import static java.nio.charset.StandardCharsets.UTF_8; + +/** Implementation of {@link BuiltInFunctionDefinitions#CONV}. */ +@Internal +public class ConvFunction extends BuiltInScalarFunction { + + public ConvFunction(SpecializedContext context) { + super(BuiltInFunctionDefinitions.CONV, context); + } + + public static @Nullable StringData eval( Review Comment: Remove static ########## docs/data/sql_functions.yml: ########## @@ -238,6 +238,16 @@ arithmetic: NUMERIC.hex() STRING.hex() description: Returns a string representation of an integer NUMERIC value or a STRING in hex format. Returns NULL if the argument is NULL. E.g. a numeric 20 leads to "14", a numeric 100 leads to "64", a string "hello,world" leads to "68656C6C6F2C776F726C64". + - sql: CONV(num, fromBase, toBase) + table: num.conv(fromBase, toBase) + description: | + Converts num from fromBase to toBase. + The function supports base 2 to base 36, fromBase in [2, 36], ABS(toBase) in [2,36], otherwise the function returns null. + If toBase is negative, num is interpreted as a signed number, otherwise it is treated as an unsigned number. The result is consistent with this rule. + If num overflows, the function raises an error. + num [<INTEGER_NUMERIC> | <CHARACTER_STRING>], fromBase [<TINYINT> | <SMALLINT> | <INTEGER>], toBase [<TINYINT> | <SMALLINT> | <INTEGER>] + Returns a STRING, null if any of arguments are null. Review Comment: ```suggestion Returns a STRING representation of the converted num, NULL if any of the arguments are NULL. ``` ########## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ConvFunction.java: ########## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.binary.BinaryStringData; +import org.apache.flink.table.data.binary.BinaryStringDataUtil; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext; +import org.apache.flink.table.runtime.functions.BaseConversionUtils; + +import javax.annotation.Nullable; + +import static java.nio.charset.StandardCharsets.UTF_8; + +/** Implementation of {@link BuiltInFunctionDefinitions#CONV}. */ +@Internal +public class ConvFunction extends BuiltInScalarFunction { + + public ConvFunction(SpecializedContext context) { + super(BuiltInFunctionDefinitions.CONV, context); + } + + public static @Nullable StringData eval( + @Nullable Number num, @Nullable Number fromBase, @Nullable Number toBase) { + if (num == null || fromBase == null || toBase == null) { + return null; + } + return BinaryStringData.fromString( + BaseConversionUtils.conv( + String.valueOf(num.longValue()).getBytes(UTF_8), + fromBase.intValue(), + toBase.intValue())); + } + + public static @Nullable StringData eval( Review Comment: ditto ########## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/BaseConversionUtils.java: ########## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions; + +import java.nio.charset.StandardCharsets; +import java.util.Arrays; + +/** + * This is ported from Hive: org.apache.hadoop.hive.ql.udf.UDFConv.java, but with modified overflow + * handling logic. + */ +public class BaseConversionUtils { + + /** + * The output string has a max length of one char per bit in the 64-bit `Long` intermediate + * representation plus one char for the '-' sign. This happens in practice when converting + * `Long.MinValue` with `toBase` equal to -2. + */ + private static final int MAX_OUTPUT_LENGTH = Long.SIZE + 1; + + /** + * Divide x by m as if x is an unsigned 64-bit integer. Examples: unsignedLongDiv(-1, 2) == + * Long.MAX_VALUE unsignedLongDiv(6, 3) == 2 unsignedLongDiv(0, 5) == 0 + * + * @param x is treated as unsigned + * @param m is treated as signed + */ + private static long unsignedLongDiv(long x, int m) { + if (x >= 0) { + return x / m; + } + + // Let uval be the value of the unsigned long with the same bits as x + // Two's complement => x = uval - 2 * MAX - 2 + // => uval = x + 2 * MAX + 2 + // Now, use the fact: (a + b) / c = a / c + b / c + (a % c + b % c) / c + + // remainder < 0 is invalid for unsigned long. Therefore, div result should be decremented + // by 1 if remainder % m < 0 + long remainder = (x % m + 2 * (Long.MAX_VALUE % m) + 2 % m); + if (remainder % m < 0) { + remainder -= m; + } + + return x / m + 2 * (Long.MAX_VALUE / m) + 2 / m + remainder / m; + } + + /** + * Decode val into value[]. + * + * @param val is treated as an unsigned 64-bit integer + * @param radix must be between MIN_RADIX and MAX_RADIX + */ + private static void decode(byte[] value, long val, int radix) { + Arrays.fill(value, (byte) 0); + for (int i = value.length - 1; val != 0; i--) { + long q = unsignedLongDiv(val, radix); + value[i] = (byte) (val - q * radix); + val = q; + } + } + + /** + * Convert value[] into a long. On overflow, return -1 (as MySQL does). + * + * @param radix must be between MIN_RADIX and MAX_RADIX + * @param fromPos is the first element that should be considered + * @return the result should be treated as an unsigned 64-bit integer. + */ + private static long encode(byte[] value, int radix, int fromPos) { + long val = 0; + + // bound will always be positive since radix >= 2. + // -1 is reserved to indicate overflows. + // possible overflow once. + long bound = unsignedLongDiv(-1 - radix, radix); + + for (int i = fromPos; i < value.length && value[i] >= 0; i++) { + // val < 0 means its bit presentation starts with 1, val * radix will cause overflow + if (val < 0) { + return -1; + } + + // bound is not accurate enough, our target is checking whether val * radix + value(i) + // can cause overflow or not. + // Just like bound, (-1 - value(i)) / radix will be positive, and we can easily check + // overflow by checking (-1 - value(i)) / radix < val or not. + if (val >= bound && unsignedLongDiv(-1 - value[i], radix) < val) { + return -1; + } + + val = val * radix + value[i]; + } + return val; + } + + /** + * Convert the bytes in value[] to the corresponding chars. + * + * @param radix must be between MIN_RADIX and MAX_RADIX + * @param fromPos is the first nonzero element + */ + private static void byte2char(byte[] value, int radix, int fromPos) { + for (int i = fromPos; i < value.length; i++) { + value[i] = (byte) Character.toUpperCase(Character.forDigit(value[i], radix)); + } + } + + /** + * Convert the chars in value[] to the corresponding integers. If invalid character is found, + * convert it to -1 and ignore the suffix starting there. + * + * @param radix must be between MIN_RADIX and MAX_RADIX + * @param fromPos is the first nonzero element + */ + private static void char2byte(byte[] value, int radix, int fromPos) { + for (int i = fromPos; i < value.length; i++) { + value[i] = (byte) Character.digit(value[i], radix); + if (value[i] == -1) { + return; + } + } + } + + /** + * Convert numbers between different number bases. If {@code toBase} is negative, {@code num} is + * interpreted as a signed number, otherwise it is treated as an unsigned number. The result is + * consistent with this rule. + */ + public static String conv(byte[] num, int fromBase, int toBase) { + if (num == null || num.length < 1) { + return null; + } + + if (fromBase < Character.MIN_RADIX + || fromBase > Character.MAX_RADIX + || Math.abs(toBase) < Character.MIN_RADIX + || Math.abs(toBase) > Character.MAX_RADIX) { + return null; + } + + boolean negative = (num[0] == '-'); + int first = negative ? 1 : 0; + + byte[] value = new byte[Math.max(num.length, MAX_OUTPUT_LENGTH)]; + System.arraycopy(num, first, value, value.length - num.length + first, num.length - first); + char2byte(value, fromBase, value.length - num.length + first); + + // do the conversion by going through a 64-bit integer + long val = encode(value, fromBase, value.length - num.length + first); + if (val == -1) { + throw new NumberFormatException( Review Comment: ArithmeticException ########## docs/data/sql_functions_zh.yml: ########## @@ -296,6 +296,15 @@ arithmetic: description: | 以十六进制格式返回整数 numeric 或字符串 string 的字符串表示。如果参数为 `NULL`,则返回 `NULL`。 例如数字 20 返回“14”,数字 100 返回“64”,字符串“hello,world” 返回“68656C6C6F2C776F726C64”。 + - sql: CONV(num, fromBase, toBase) + table: num.conv(fromBase, toBase) + description: | + 将 num 从 fromBase 进制转换为 toBase 进制。 + 该函数支持 2 进制到 36 进制,fromBase 应在 [2, 36] 范围内,toBase 的绝对值应在[2, 36]范围内,否则函数将会返回 null。 + 如果 toBase 为负数,则 num 被视为有符号数处理;如果为正数,则视为无符号数处理。转换结果与此规则保持一致。 + 如果 num 溢出,函数将抛出错误。 + num [<INTEGER_NUMERIC> | <CHARACTER_STRING>], fromBase [<TINYINT> | <SMALLINT> | <INTEGER>], toBase [<TINYINT> | <SMALLINT> | <INTEGER>] + 函数返回一个字符串结果。如果任何参数为null,则返回null。 Review Comment: 如果任何参数为null,则返回null -> 如果任何参数为 NULL,则返回 NULL。 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org