lifengchao created FLINK-33963:
----------------------------------
Summary: There is only one UDF instance after serializing the same
task
Key: FLINK-33963
URL: https://issues.apache.org/jira/browse/FLINK-33963
Project: Flink
Issue Type: Bug
Components: Table SQL / Planner
Affects Versions: 1.18.0
Environment: local env in idea test.
java 8
Reporter: lifengchao
Fix For: 1.18.0
I define this UDF and expect the following SQL to return 'a', 'b', but it
return 'a', 'a'.
```java
public class UdfSerializeFunc extends ScalarFunction {
static final Logger LOG = LoggerFactory.getLogger(UdfSerializeFunc.class);
String cache;
@Override
public void open(FunctionContext context) throws Exception {
LOG.warn("open:{}.", this.hashCode());
}
public String eval(String a, String b){
if(cache == null){
LOG.warn("cache_null.cache:{}", b);
cache = b;
}
return cache;
}
}
```
```
select
name,
udf_ser(name, 'a') name1,
udf_ser(name, 'b') name2
from heros
```
Changing UDF to this will achieve the expected results.
```java
public class UdfSerializeFunc2 extends ScalarFunction {
static final Logger LOG = LoggerFactory.getLogger(UdfSerializeFunc2.class);
String cache;
@Override
public void open(FunctionContext context) throws Exception {
LOG.warn("open:{}.", this.hashCode());
}
public String eval(String a, String b){
if(cache == null){
LOG.warn("cache_null.cache:{}", b);
cache = b;
}
return cache;
}
@Override
public TypeInference getTypeInference(DataTypeFactory typeFactory) {
return TypeInference.newBuilder()
.outputTypeStrategy(new TypeStrategy() {
@Override
public Optional<DataType> inferType(CallContext callContext) {
List<DataType> argumentDataTypes = callContext.getArgumentDataTypes();
if (argumentDataTypes.size() != 2) {
throw callContext.newValidationError("arg size error");
}
if (!callContext.isArgumentLiteral(1) || callContext.isArgumentNull(1)) {
throw callContext.newValidationError("Literal expected for second argument.");
}
cache = callContext.getArgumentValue(1, String.class).get();
return Optional.of(DataTypes.STRING());
}
})
.build();
}
}
```
My complete test code:
```
public class UdfSerializeFunc extends ScalarFunction {
static final Logger LOG = LoggerFactory.getLogger(UdfSerializeFunc.class);
String cache;
@Override
public void open(FunctionContext context) throws Exception {
LOG.warn("open:{}.", this.hashCode());
}
public String eval(String a, String b){
if(cache == null){
LOG.warn("cache_null.cache:{}", b);
cache = b;
}
return cache;
}
}
public class UdfSerializeFunc2 extends ScalarFunction {
static final Logger LOG = LoggerFactory.getLogger(UdfSerializeFunc2.class);
String cache;
@Override
public void open(FunctionContext context) throws Exception {
LOG.warn("open:{}.", this.hashCode());
}
public String eval(String a, String b){
if(cache == null){
LOG.warn("cache_null.cache:{}", b);
cache = b;
}
return cache;
}
@Override
public TypeInference getTypeInference(DataTypeFactory typeFactory) {
return TypeInference.newBuilder()
.outputTypeStrategy(new TypeStrategy() {
@Override
public Optional<DataType> inferType(CallContext callContext) {
List<DataType> argumentDataTypes = callContext.getArgumentDataTypes();
if (argumentDataTypes.size() != 2) {
throw callContext.newValidationError("arg size error");
}
if (!callContext.isArgumentLiteral(1) || callContext.isArgumentNull(1)) {
throw callContext.newValidationError("Literal expected for second argument.");
}
cache = callContext.getArgumentValue(1, String.class).get();
return Optional.of(DataTypes.STRING());
}
})
.build();
}
}
class UdfSerializeSuite extends AnyFunSuite with BeforeAndAfterAll{
var env: StreamExecutionEnvironment = _
var tEnv: StreamTableEnvironment = _
override protected def beforeAll(): Unit = {
val conf = new Configuration()
env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
env.setParallelism(2)
env.getConfig.enableObjectReuse()
tEnv = StreamTableEnvironment.create(env)
}
/**
* 2个task,只是每个task有一个udf,udf_ser(name, 'a')和udf_ser(name, 'b')没区分开
* 它这函数的序列化真傻屌,单个task的2个udf_ser序列化后还是同一个对象,不是2个
* getTypeInference中修改udf的属性可以实现2个不同的对象
*/
test("UdfSerializeFunc"){
tEnv.createTemporarySystemFunction("udf_ser", classOf[UdfSerializeFunc])
var sql = """
CREATE TEMPORARY TABLE heros (
`name` STRING,
`power` STRING,
`age` INT
) WITH (
'connector' = 'faker',
'fields.name.expression' = '#\{superhero.name}',
'fields.power.expression' = '#\{superhero.power}',
'fields.power.null-rate' = '0.05',
'rows-per-second' = '1',
'fields.age.expression' = '#\{number.numberBetween ''0'',''1000''}'
)
"""
tEnv.executeSql(sql)
sql = """
select
udf_ser(name, 'a') name1,
udf_ser(name, 'b') name2
from heros
"""
val rstTable = tEnv.sqlQuery(sql)
rstTable.printSchema()
rstTable.execute().print()
}
/**
* 修改ScalarFunction的属性,能使之序列化后是不同的对象
*/
test("UdfSerializeFunc2"){
tEnv.createTemporarySystemFunction("udf_ser", classOf[UdfSerializeFunc2])
var sql = """
CREATE TEMPORARY TABLE heros (
`name` STRING,
`power` STRING,
`age` INT
) WITH (
'connector' = 'faker',
'fields.name.expression' = '#\{superhero.name}',
'fields.power.expression' = '#\{superhero.power}',
'fields.power.null-rate' = '0.05',
'rows-per-second' = '1',
'fields.age.expression' = '#\{number.numberBetween ''0'',''1000''}'
)
"""
tEnv.executeSql(sql)
sql = """
select
udf_ser(name, 'a') name1,
udf_ser(name, 'b') name2
from heros
"""
val rstTable = tEnv.sqlQuery(sql)
rstTable.printSchema()
rstTable.execute().print()
}
override protected def afterAll(): Unit = {
env.execute()
}
}
```
test UdfSerializeFunc log out:
```
(
`name1` STRING,
`name2` STRING
)
14:44:41,183 DEBUG
org.apache.flink.table.planner.codegen.OperatorCodeGenerator$
[ScalaTest-run-running-UdfSerializeSuite] [] - Compiling OneInputStreamOperator
Code:
StreamExecCalc
14:44:42,819 WARN
org.apache.flink.runtime.security.token.DefaultDelegationTokenManager
[ScalaTest-run-running-UdfSerializeSuite] [] - No tokens obtained so skipping
notifications
14:44:43,092 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint
[ScalaTest-run-running-UdfSerializeSuite] [] - Starting rest endpoint.
14:44:43,240 WARN org.apache.flink.runtime.webmonitor.WebMonitorUtils
[ScalaTest-run-running-UdfSerializeSuite] [] - Log file environment variable
'log.file' is not set.
14:44:43,240 WARN org.apache.flink.runtime.webmonitor.WebMonitorUtils
[ScalaTest-run-running-UdfSerializeSuite] [] - JobManager log files are
unavailable in the web dashboard. Log file location not found in environment
variable 'log.file' or configuration key 'web.log.path'.
14:44:43,711 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint
[ScalaTest-run-running-UdfSerializeSuite] [] - Rest endpoint listening at
localhost:8081
14:44:43,716 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint
[ScalaTest-run-running-UdfSerializeSuite] [] - Web frontend listening at
http://localhost:8081.
14:44:43,717 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint
[mini-cluster-io-thread-1] [] - http://localhost:8081 was granted leadership
with leaderSessionID=18ab2e30-a83a-4ec0-be98-7d49b7628565
14:44:43,789 WARN
org.apache.flink.runtime.security.token.DefaultDelegationTokenManager
[flink-pekko.actor.default-dispatcher-4] [] - No tokens obtained so skipping
notifications
14:44:43,790 WARN
org.apache.flink.runtime.security.token.DefaultDelegationTokenManager
[flink-pekko.actor.default-dispatcher-4] [] - Tokens update task not started
because either no tokens obtained or none of the tokens specified its renewal
date
14:44:44,576 WARN com.java.flink.sql.udf.serialize.UdfSerializeFunc [Source:
Source Generator -> heros[1] -> Calc[2] (1/2)#0] [] - open:969139468.
14:44:44,576 WARN com.java.flink.sql.udf.serialize.UdfSerializeFunc [Source:
Source Generator -> heros[1] -> Calc[2] (2/2)#0] [] - open:1737783673.
14:44:44,607 WARN com.java.flink.sql.udf.serialize.UdfSerializeFunc [Source:
Source Generator -> heros[1] -> Calc[2] (1/2)#0] [] - cache_null.cache:a
14:44:44,607 WARN com.java.flink.sql.udf.serialize.UdfSerializeFunc [Source:
Source Generator -> heros[1] -> Calc[2] (2/2)#0] [] - cache_null.cache:a
+----+--------------------------------+--------------------------------+
| op | name1 | name2 |
+----+--------------------------------+--------------------------------+
| +I | a | a |
| +I | a | a |
| +I | a | a |
| +I | a | a |
| +I | a | a |
| +I | a | a |
| +I | a | a |
| +I | a | a |
```
test UdfSerializeFunc2 log out:
```
(
`name1` STRING,
`name2` STRING
)
14:45:18,786 DEBUG
org.apache.flink.table.planner.codegen.OperatorCodeGenerator$
[ScalaTest-run-running-UdfSerializeSuite] [] - Compiling OneInputStreamOperator
Code:
StreamExecCalc
14:45:20,296 WARN
org.apache.flink.runtime.security.token.DefaultDelegationTokenManager
[ScalaTest-run-running-UdfSerializeSuite] [] - No tokens obtained so skipping
notifications
14:45:20,518 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint
[ScalaTest-run-running-UdfSerializeSuite] [] - Starting rest endpoint.
14:45:20,635 WARN org.apache.flink.runtime.webmonitor.WebMonitorUtils
[ScalaTest-run-running-UdfSerializeSuite] [] - Log file environment variable
'log.file' is not set.
14:45:20,635 WARN org.apache.flink.runtime.webmonitor.WebMonitorUtils
[ScalaTest-run-running-UdfSerializeSuite] [] - JobManager log files are
unavailable in the web dashboard. Log file location not found in environment
variable 'log.file' or configuration key 'web.log.path'.
14:45:21,032 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint
[ScalaTest-run-running-UdfSerializeSuite] [] - Rest endpoint listening at
localhost:8081
14:45:21,034 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint
[ScalaTest-run-running-UdfSerializeSuite] [] - Web frontend listening at
http://localhost:8081.
14:45:21,035 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint
[mini-cluster-io-thread-1] [] - http://localhost:8081 was granted leadership
with leaderSessionID=2fcfdba0-0e36-4e8b-9f3c-88f2c242458f
14:45:21,089 WARN
org.apache.flink.runtime.security.token.DefaultDelegationTokenManager
[flink-pekko.actor.default-dispatcher-4] [] - No tokens obtained so skipping
notifications
14:45:21,089 WARN
org.apache.flink.runtime.security.token.DefaultDelegationTokenManager
[flink-pekko.actor.default-dispatcher-4] [] - Tokens update task not started
because either no tokens obtained or none of the tokens specified its renewal
date
14:45:21,741 WARN com.java.flink.sql.udf.serialize.UdfSerializeFunc2 [Source:
Source Generator -> heros[1] -> Calc[2] (1/2)#0] [] - open:1439144392.
14:45:21,741 WARN com.java.flink.sql.udf.serialize.UdfSerializeFunc2 [Source:
Source Generator -> heros[1] -> Calc[2] (2/2)#0] [] - open:381953409.
14:45:21,742 WARN com.java.flink.sql.udf.serialize.UdfSerializeFunc2 [Source:
Source Generator -> heros[1] -> Calc[2] (1/2)#0] [] - open:1162638327.
14:45:21,742 WARN com.java.flink.sql.udf.serialize.UdfSerializeFunc2 [Source:
Source Generator -> heros[1] -> Calc[2] (2/2)#0] [] - open:391248806.
+----+--------------------------------+--------------------------------+
| op | name1 | name2 |
+----+--------------------------------+--------------------------------+
| +I | a | b |
| +I | a | b |
| +I | a | b |
| +I | a | b |
```
*This is an issue caused by UDF function serialization.*
--
This message was sent by Atlassian Jira
(v8.20.10#820010)