lifengchao created FLINK-33963:
----------------------------------

             Summary: There is only one UDF instance after serializing the same 
task
                 Key: FLINK-33963
                 URL: https://issues.apache.org/jira/browse/FLINK-33963
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
    Affects Versions: 1.18.0
         Environment: local env in idea test.

java 8
            Reporter: lifengchao
             Fix For: 1.18.0


I define this UDF and expect the following SQL to return 'a', 'b', but it 
return 'a', 'a'.

```java
public class UdfSerializeFunc extends ScalarFunction {
static final Logger LOG = LoggerFactory.getLogger(UdfSerializeFunc.class);
String cache;
@Override
public void open(FunctionContext context) throws Exception {
LOG.warn("open:{}.", this.hashCode());
}

public String eval(String a, String b){
if(cache == null){
LOG.warn("cache_null.cache:{}", b);
cache = b;
}
return cache;
}
}
```

```
select
name,
udf_ser(name, 'a') name1,
udf_ser(name, 'b') name2
from heros
```

Changing UDF to this will achieve the expected results.

```java
public class UdfSerializeFunc2 extends ScalarFunction {
static final Logger LOG = LoggerFactory.getLogger(UdfSerializeFunc2.class);
String cache;
@Override
public void open(FunctionContext context) throws Exception {
LOG.warn("open:{}.", this.hashCode());
}

public String eval(String a, String b){
if(cache == null){
LOG.warn("cache_null.cache:{}", b);
cache = b;
}
return cache;
}

@Override
public TypeInference getTypeInference(DataTypeFactory typeFactory) {
return TypeInference.newBuilder()
.outputTypeStrategy(new TypeStrategy() {
@Override
public Optional<DataType> inferType(CallContext callContext) {
List<DataType> argumentDataTypes = callContext.getArgumentDataTypes();
if (argumentDataTypes.size() != 2) {
throw callContext.newValidationError("arg size error");
}
if (!callContext.isArgumentLiteral(1) || callContext.isArgumentNull(1)) {
throw callContext.newValidationError("Literal expected for second argument.");
}
cache = callContext.getArgumentValue(1, String.class).get();
return Optional.of(DataTypes.STRING());
}
})
.build();
}
}
```

 

My complete test code:

```
public class UdfSerializeFunc extends ScalarFunction {
static final Logger LOG = LoggerFactory.getLogger(UdfSerializeFunc.class);
String cache;
@Override
public void open(FunctionContext context) throws Exception {
LOG.warn("open:{}.", this.hashCode());
}

public String eval(String a, String b){
if(cache == null){
LOG.warn("cache_null.cache:{}", b);
cache = b;
}
return cache;
}
}
 
public class UdfSerializeFunc2 extends ScalarFunction {
static final Logger LOG = LoggerFactory.getLogger(UdfSerializeFunc2.class);
String cache;
@Override
public void open(FunctionContext context) throws Exception {
LOG.warn("open:{}.", this.hashCode());
}

public String eval(String a, String b){
if(cache == null){
LOG.warn("cache_null.cache:{}", b);
cache = b;
}
return cache;
}

@Override
public TypeInference getTypeInference(DataTypeFactory typeFactory) {
return TypeInference.newBuilder()
.outputTypeStrategy(new TypeStrategy() {
@Override
public Optional<DataType> inferType(CallContext callContext) {
List<DataType> argumentDataTypes = callContext.getArgumentDataTypes();
if (argumentDataTypes.size() != 2) {
throw callContext.newValidationError("arg size error");
}
if (!callContext.isArgumentLiteral(1) || callContext.isArgumentNull(1)) {
throw callContext.newValidationError("Literal expected for second argument.");
}
cache = callContext.getArgumentValue(1, String.class).get();
return Optional.of(DataTypes.STRING());
}
})
.build();
}
}
 
class UdfSerializeSuite extends AnyFunSuite with BeforeAndAfterAll{
var env: StreamExecutionEnvironment = _
var tEnv: StreamTableEnvironment = _

override protected def beforeAll(): Unit = {
val conf = new Configuration()
env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
env.setParallelism(2)
env.getConfig.enableObjectReuse()

tEnv = StreamTableEnvironment.create(env)
}

/**
* 2个task,只是每个task有一个udf,udf_ser(name, 'a')和udf_ser(name, 'b')没区分开
* 它这函数的序列化真傻屌,单个task的2个udf_ser序列化后还是同一个对象,不是2个
* getTypeInference中修改udf的属性可以实现2个不同的对象
*/
test("UdfSerializeFunc"){
tEnv.createTemporarySystemFunction("udf_ser", classOf[UdfSerializeFunc])

var sql = """
CREATE TEMPORARY TABLE heros (
`name` STRING,
`power` STRING,
`age` INT
) WITH (
'connector' = 'faker',
'fields.name.expression' = '#\{superhero.name}',
'fields.power.expression' = '#\{superhero.power}',
'fields.power.null-rate' = '0.05',
'rows-per-second' = '1',
'fields.age.expression' = '#\{number.numberBetween ''0'',''1000''}'
)
"""
tEnv.executeSql(sql)

sql = """
select
udf_ser(name, 'a') name1,
udf_ser(name, 'b') name2
from heros
"""
val rstTable = tEnv.sqlQuery(sql)
rstTable.printSchema()

rstTable.execute().print()
}

/**
* 修改ScalarFunction的属性,能使之序列化后是不同的对象
*/
test("UdfSerializeFunc2"){
tEnv.createTemporarySystemFunction("udf_ser", classOf[UdfSerializeFunc2])

var sql = """
CREATE TEMPORARY TABLE heros (
`name` STRING,
`power` STRING,
`age` INT
) WITH (
'connector' = 'faker',
'fields.name.expression' = '#\{superhero.name}',
'fields.power.expression' = '#\{superhero.power}',
'fields.power.null-rate' = '0.05',
'rows-per-second' = '1',
'fields.age.expression' = '#\{number.numberBetween ''0'',''1000''}'
)
"""
tEnv.executeSql(sql)

sql = """
select
udf_ser(name, 'a') name1,
udf_ser(name, 'b') name2
from heros
"""
val rstTable = tEnv.sqlQuery(sql)
rstTable.printSchema()

rstTable.execute().print()
}

override protected def afterAll(): Unit = {
env.execute()
}

}
```

test UdfSerializeFunc log out:

```

(
  `name1` STRING,
  `name2` STRING
)
14:44:41,183  DEBUG 
org.apache.flink.table.planner.codegen.OperatorCodeGenerator$ 
[ScalaTest-run-running-UdfSerializeSuite] [] - Compiling OneInputStreamOperator 
Code:
StreamExecCalc
14:44:42,819  WARN  
org.apache.flink.runtime.security.token.DefaultDelegationTokenManager 
[ScalaTest-run-running-UdfSerializeSuite] [] - No tokens obtained so skipping 
notifications
14:44:43,092  INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint 
[ScalaTest-run-running-UdfSerializeSuite] [] - Starting rest endpoint.
14:44:43,240  WARN  org.apache.flink.runtime.webmonitor.WebMonitorUtils 
[ScalaTest-run-running-UdfSerializeSuite] [] - Log file environment variable 
'log.file' is not set.
14:44:43,240  WARN  org.apache.flink.runtime.webmonitor.WebMonitorUtils 
[ScalaTest-run-running-UdfSerializeSuite] [] - JobManager log files are 
unavailable in the web dashboard. Log file location not found in environment 
variable 'log.file' or configuration key 'web.log.path'.
14:44:43,711  INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint 
[ScalaTest-run-running-UdfSerializeSuite] [] - Rest endpoint listening at 
localhost:8081
14:44:43,716  INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint 
[ScalaTest-run-running-UdfSerializeSuite] [] - Web frontend listening at 
http://localhost:8081.
14:44:43,717  INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint 
[mini-cluster-io-thread-1] [] - http://localhost:8081 was granted leadership 
with leaderSessionID=18ab2e30-a83a-4ec0-be98-7d49b7628565
14:44:43,789  WARN  
org.apache.flink.runtime.security.token.DefaultDelegationTokenManager 
[flink-pekko.actor.default-dispatcher-4] [] - No tokens obtained so skipping 
notifications
14:44:43,790  WARN  
org.apache.flink.runtime.security.token.DefaultDelegationTokenManager 
[flink-pekko.actor.default-dispatcher-4] [] - Tokens update task not started 
because either no tokens obtained or none of the tokens specified its renewal 
date
14:44:44,576  WARN  com.java.flink.sql.udf.serialize.UdfSerializeFunc [Source: 
Source Generator -> heros[1] -> Calc[2] (1/2)#0] [] - open:969139468.
14:44:44,576  WARN  com.java.flink.sql.udf.serialize.UdfSerializeFunc [Source: 
Source Generator -> heros[1] -> Calc[2] (2/2)#0] [] - open:1737783673.
14:44:44,607  WARN  com.java.flink.sql.udf.serialize.UdfSerializeFunc [Source: 
Source Generator -> heros[1] -> Calc[2] (1/2)#0] [] - cache_null.cache:a
14:44:44,607  WARN  com.java.flink.sql.udf.serialize.UdfSerializeFunc [Source: 
Source Generator -> heros[1] -> Calc[2] (2/2)#0] [] - cache_null.cache:a
+----+--------------------------------+--------------------------------+
| op |                          name1 |                          name2 |
+----+--------------------------------+--------------------------------+
| +I |                              a |                              a |
| +I |                              a |                              a |
| +I |                              a |                              a |
| +I |                              a |                              a |
| +I |                              a |                              a |
| +I |                              a |                              a |
| +I |                              a |                              a |
| +I |                              a |                              a |

```

test UdfSerializeFunc2 log out:

```

(
  `name1` STRING,
  `name2` STRING
)
14:45:18,786  DEBUG 
org.apache.flink.table.planner.codegen.OperatorCodeGenerator$ 
[ScalaTest-run-running-UdfSerializeSuite] [] - Compiling OneInputStreamOperator 
Code:
StreamExecCalc
14:45:20,296  WARN  
org.apache.flink.runtime.security.token.DefaultDelegationTokenManager 
[ScalaTest-run-running-UdfSerializeSuite] [] - No tokens obtained so skipping 
notifications
14:45:20,518  INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint 
[ScalaTest-run-running-UdfSerializeSuite] [] - Starting rest endpoint.
14:45:20,635  WARN  org.apache.flink.runtime.webmonitor.WebMonitorUtils 
[ScalaTest-run-running-UdfSerializeSuite] [] - Log file environment variable 
'log.file' is not set.
14:45:20,635  WARN  org.apache.flink.runtime.webmonitor.WebMonitorUtils 
[ScalaTest-run-running-UdfSerializeSuite] [] - JobManager log files are 
unavailable in the web dashboard. Log file location not found in environment 
variable 'log.file' or configuration key 'web.log.path'.
14:45:21,032  INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint 
[ScalaTest-run-running-UdfSerializeSuite] [] - Rest endpoint listening at 
localhost:8081
14:45:21,034  INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint 
[ScalaTest-run-running-UdfSerializeSuite] [] - Web frontend listening at 
http://localhost:8081.
14:45:21,035  INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint 
[mini-cluster-io-thread-1] [] - http://localhost:8081 was granted leadership 
with leaderSessionID=2fcfdba0-0e36-4e8b-9f3c-88f2c242458f
14:45:21,089  WARN  
org.apache.flink.runtime.security.token.DefaultDelegationTokenManager 
[flink-pekko.actor.default-dispatcher-4] [] - No tokens obtained so skipping 
notifications
14:45:21,089  WARN  
org.apache.flink.runtime.security.token.DefaultDelegationTokenManager 
[flink-pekko.actor.default-dispatcher-4] [] - Tokens update task not started 
because either no tokens obtained or none of the tokens specified its renewal 
date
14:45:21,741  WARN  com.java.flink.sql.udf.serialize.UdfSerializeFunc2 [Source: 
Source Generator -> heros[1] -> Calc[2] (1/2)#0] [] - open:1439144392.
14:45:21,741  WARN  com.java.flink.sql.udf.serialize.UdfSerializeFunc2 [Source: 
Source Generator -> heros[1] -> Calc[2] (2/2)#0] [] - open:381953409.
14:45:21,742  WARN  com.java.flink.sql.udf.serialize.UdfSerializeFunc2 [Source: 
Source Generator -> heros[1] -> Calc[2] (1/2)#0] [] - open:1162638327.
14:45:21,742  WARN  com.java.flink.sql.udf.serialize.UdfSerializeFunc2 [Source: 
Source Generator -> heros[1] -> Calc[2] (2/2)#0] [] - open:391248806.
+----+--------------------------------+--------------------------------+
| op |                          name1 |                          name2 |
+----+--------------------------------+--------------------------------+
| +I |                              a |                              b |
| +I |                              a |                              b |
| +I |                              a |                              b |
| +I |                              a |                              b |

```

 

*This is an issue caused by UDF function serialization.*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to