lifengchao created FLINK-33963: ---------------------------------- Summary: There is only one UDF instance after serializing the same task Key: FLINK-33963 URL: https://issues.apache.org/jira/browse/FLINK-33963 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.18.0 Environment: local env in idea test.
java 8 Reporter: lifengchao Fix For: 1.18.0 I define this UDF and expect the following SQL to return 'a', 'b', but it return 'a', 'a'. ```java public class UdfSerializeFunc extends ScalarFunction { static final Logger LOG = LoggerFactory.getLogger(UdfSerializeFunc.class); String cache; @Override public void open(FunctionContext context) throws Exception { LOG.warn("open:{}.", this.hashCode()); } public String eval(String a, String b){ if(cache == null){ LOG.warn("cache_null.cache:{}", b); cache = b; } return cache; } } ``` ``` select name, udf_ser(name, 'a') name1, udf_ser(name, 'b') name2 from heros ``` Changing UDF to this will achieve the expected results. ```java public class UdfSerializeFunc2 extends ScalarFunction { static final Logger LOG = LoggerFactory.getLogger(UdfSerializeFunc2.class); String cache; @Override public void open(FunctionContext context) throws Exception { LOG.warn("open:{}.", this.hashCode()); } public String eval(String a, String b){ if(cache == null){ LOG.warn("cache_null.cache:{}", b); cache = b; } return cache; } @Override public TypeInference getTypeInference(DataTypeFactory typeFactory) { return TypeInference.newBuilder() .outputTypeStrategy(new TypeStrategy() { @Override public Optional<DataType> inferType(CallContext callContext) { List<DataType> argumentDataTypes = callContext.getArgumentDataTypes(); if (argumentDataTypes.size() != 2) { throw callContext.newValidationError("arg size error"); } if (!callContext.isArgumentLiteral(1) || callContext.isArgumentNull(1)) { throw callContext.newValidationError("Literal expected for second argument."); } cache = callContext.getArgumentValue(1, String.class).get(); return Optional.of(DataTypes.STRING()); } }) .build(); } } ``` My complete test code: ``` public class UdfSerializeFunc extends ScalarFunction { static final Logger LOG = LoggerFactory.getLogger(UdfSerializeFunc.class); String cache; @Override public void open(FunctionContext context) throws Exception { LOG.warn("open:{}.", this.hashCode()); } public String eval(String a, String b){ if(cache == null){ LOG.warn("cache_null.cache:{}", b); cache = b; } return cache; } } public class UdfSerializeFunc2 extends ScalarFunction { static final Logger LOG = LoggerFactory.getLogger(UdfSerializeFunc2.class); String cache; @Override public void open(FunctionContext context) throws Exception { LOG.warn("open:{}.", this.hashCode()); } public String eval(String a, String b){ if(cache == null){ LOG.warn("cache_null.cache:{}", b); cache = b; } return cache; } @Override public TypeInference getTypeInference(DataTypeFactory typeFactory) { return TypeInference.newBuilder() .outputTypeStrategy(new TypeStrategy() { @Override public Optional<DataType> inferType(CallContext callContext) { List<DataType> argumentDataTypes = callContext.getArgumentDataTypes(); if (argumentDataTypes.size() != 2) { throw callContext.newValidationError("arg size error"); } if (!callContext.isArgumentLiteral(1) || callContext.isArgumentNull(1)) { throw callContext.newValidationError("Literal expected for second argument."); } cache = callContext.getArgumentValue(1, String.class).get(); return Optional.of(DataTypes.STRING()); } }) .build(); } } class UdfSerializeSuite extends AnyFunSuite with BeforeAndAfterAll{ var env: StreamExecutionEnvironment = _ var tEnv: StreamTableEnvironment = _ override protected def beforeAll(): Unit = { val conf = new Configuration() env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf) env.setParallelism(2) env.getConfig.enableObjectReuse() tEnv = StreamTableEnvironment.create(env) } /** * 2个task,只是每个task有一个udf,udf_ser(name, 'a')和udf_ser(name, 'b')没区分开 * 它这函数的序列化真傻屌,单个task的2个udf_ser序列化后还是同一个对象,不是2个 * getTypeInference中修改udf的属性可以实现2个不同的对象 */ test("UdfSerializeFunc"){ tEnv.createTemporarySystemFunction("udf_ser", classOf[UdfSerializeFunc]) var sql = """ CREATE TEMPORARY TABLE heros ( `name` STRING, `power` STRING, `age` INT ) WITH ( 'connector' = 'faker', 'fields.name.expression' = '#\{superhero.name}', 'fields.power.expression' = '#\{superhero.power}', 'fields.power.null-rate' = '0.05', 'rows-per-second' = '1', 'fields.age.expression' = '#\{number.numberBetween ''0'',''1000''}' ) """ tEnv.executeSql(sql) sql = """ select udf_ser(name, 'a') name1, udf_ser(name, 'b') name2 from heros """ val rstTable = tEnv.sqlQuery(sql) rstTable.printSchema() rstTable.execute().print() } /** * 修改ScalarFunction的属性,能使之序列化后是不同的对象 */ test("UdfSerializeFunc2"){ tEnv.createTemporarySystemFunction("udf_ser", classOf[UdfSerializeFunc2]) var sql = """ CREATE TEMPORARY TABLE heros ( `name` STRING, `power` STRING, `age` INT ) WITH ( 'connector' = 'faker', 'fields.name.expression' = '#\{superhero.name}', 'fields.power.expression' = '#\{superhero.power}', 'fields.power.null-rate' = '0.05', 'rows-per-second' = '1', 'fields.age.expression' = '#\{number.numberBetween ''0'',''1000''}' ) """ tEnv.executeSql(sql) sql = """ select udf_ser(name, 'a') name1, udf_ser(name, 'b') name2 from heros """ val rstTable = tEnv.sqlQuery(sql) rstTable.printSchema() rstTable.execute().print() } override protected def afterAll(): Unit = { env.execute() } } ``` test UdfSerializeFunc log out: ``` ( `name1` STRING, `name2` STRING ) 14:44:41,183 DEBUG org.apache.flink.table.planner.codegen.OperatorCodeGenerator$ [ScalaTest-run-running-UdfSerializeSuite] [] - Compiling OneInputStreamOperator Code: StreamExecCalc 14:44:42,819 WARN org.apache.flink.runtime.security.token.DefaultDelegationTokenManager [ScalaTest-run-running-UdfSerializeSuite] [] - No tokens obtained so skipping notifications 14:44:43,092 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint [ScalaTest-run-running-UdfSerializeSuite] [] - Starting rest endpoint. 14:44:43,240 WARN org.apache.flink.runtime.webmonitor.WebMonitorUtils [ScalaTest-run-running-UdfSerializeSuite] [] - Log file environment variable 'log.file' is not set. 14:44:43,240 WARN org.apache.flink.runtime.webmonitor.WebMonitorUtils [ScalaTest-run-running-UdfSerializeSuite] [] - JobManager log files are unavailable in the web dashboard. Log file location not found in environment variable 'log.file' or configuration key 'web.log.path'. 14:44:43,711 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint [ScalaTest-run-running-UdfSerializeSuite] [] - Rest endpoint listening at localhost:8081 14:44:43,716 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint [ScalaTest-run-running-UdfSerializeSuite] [] - Web frontend listening at http://localhost:8081. 14:44:43,717 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint [mini-cluster-io-thread-1] [] - http://localhost:8081 was granted leadership with leaderSessionID=18ab2e30-a83a-4ec0-be98-7d49b7628565 14:44:43,789 WARN org.apache.flink.runtime.security.token.DefaultDelegationTokenManager [flink-pekko.actor.default-dispatcher-4] [] - No tokens obtained so skipping notifications 14:44:43,790 WARN org.apache.flink.runtime.security.token.DefaultDelegationTokenManager [flink-pekko.actor.default-dispatcher-4] [] - Tokens update task not started because either no tokens obtained or none of the tokens specified its renewal date 14:44:44,576 WARN com.java.flink.sql.udf.serialize.UdfSerializeFunc [Source: Source Generator -> heros[1] -> Calc[2] (1/2)#0] [] - open:969139468. 14:44:44,576 WARN com.java.flink.sql.udf.serialize.UdfSerializeFunc [Source: Source Generator -> heros[1] -> Calc[2] (2/2)#0] [] - open:1737783673. 14:44:44,607 WARN com.java.flink.sql.udf.serialize.UdfSerializeFunc [Source: Source Generator -> heros[1] -> Calc[2] (1/2)#0] [] - cache_null.cache:a 14:44:44,607 WARN com.java.flink.sql.udf.serialize.UdfSerializeFunc [Source: Source Generator -> heros[1] -> Calc[2] (2/2)#0] [] - cache_null.cache:a +----+--------------------------------+--------------------------------+ | op | name1 | name2 | +----+--------------------------------+--------------------------------+ | +I | a | a | | +I | a | a | | +I | a | a | | +I | a | a | | +I | a | a | | +I | a | a | | +I | a | a | | +I | a | a | ``` test UdfSerializeFunc2 log out: ``` ( `name1` STRING, `name2` STRING ) 14:45:18,786 DEBUG org.apache.flink.table.planner.codegen.OperatorCodeGenerator$ [ScalaTest-run-running-UdfSerializeSuite] [] - Compiling OneInputStreamOperator Code: StreamExecCalc 14:45:20,296 WARN org.apache.flink.runtime.security.token.DefaultDelegationTokenManager [ScalaTest-run-running-UdfSerializeSuite] [] - No tokens obtained so skipping notifications 14:45:20,518 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint [ScalaTest-run-running-UdfSerializeSuite] [] - Starting rest endpoint. 14:45:20,635 WARN org.apache.flink.runtime.webmonitor.WebMonitorUtils [ScalaTest-run-running-UdfSerializeSuite] [] - Log file environment variable 'log.file' is not set. 14:45:20,635 WARN org.apache.flink.runtime.webmonitor.WebMonitorUtils [ScalaTest-run-running-UdfSerializeSuite] [] - JobManager log files are unavailable in the web dashboard. Log file location not found in environment variable 'log.file' or configuration key 'web.log.path'. 14:45:21,032 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint [ScalaTest-run-running-UdfSerializeSuite] [] - Rest endpoint listening at localhost:8081 14:45:21,034 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint [ScalaTest-run-running-UdfSerializeSuite] [] - Web frontend listening at http://localhost:8081. 14:45:21,035 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint [mini-cluster-io-thread-1] [] - http://localhost:8081 was granted leadership with leaderSessionID=2fcfdba0-0e36-4e8b-9f3c-88f2c242458f 14:45:21,089 WARN org.apache.flink.runtime.security.token.DefaultDelegationTokenManager [flink-pekko.actor.default-dispatcher-4] [] - No tokens obtained so skipping notifications 14:45:21,089 WARN org.apache.flink.runtime.security.token.DefaultDelegationTokenManager [flink-pekko.actor.default-dispatcher-4] [] - Tokens update task not started because either no tokens obtained or none of the tokens specified its renewal date 14:45:21,741 WARN com.java.flink.sql.udf.serialize.UdfSerializeFunc2 [Source: Source Generator -> heros[1] -> Calc[2] (1/2)#0] [] - open:1439144392. 14:45:21,741 WARN com.java.flink.sql.udf.serialize.UdfSerializeFunc2 [Source: Source Generator -> heros[1] -> Calc[2] (2/2)#0] [] - open:381953409. 14:45:21,742 WARN com.java.flink.sql.udf.serialize.UdfSerializeFunc2 [Source: Source Generator -> heros[1] -> Calc[2] (1/2)#0] [] - open:1162638327. 14:45:21,742 WARN com.java.flink.sql.udf.serialize.UdfSerializeFunc2 [Source: Source Generator -> heros[1] -> Calc[2] (2/2)#0] [] - open:391248806. +----+--------------------------------+--------------------------------+ | op | name1 | name2 | +----+--------------------------------+--------------------------------+ | +I | a | b | | +I | a | b | | +I | a | b | | +I | a | b | ``` *This is an issue caused by UDF function serialization.* -- This message was sent by Atlassian Jira (v8.20.10#820010)