Xinyi Yan created FLINK-29855:
---------------------------------

             Summary: UDF randomly processed input data twice 
                 Key: FLINK-29855
                 URL: https://issues.apache.org/jira/browse/FLINK-29855
             Project: Flink
          Issue Type: Bug
    Affects Versions: 1.14.4
            Reporter: Xinyi Yan
         Attachments: IntInputUdf.java, SpendReport.java, example.log

 

To reproduce the issue:
 # create a datagen table with a single column int type of id.
 # create a UDF that only mod input data with logging statements.
 # create a print table that prints the results.
 # insert data into the print table with UDF(input id column) execution from 
the datagen table.

The logging shows that some of the data have been processed twice, which is not 
expected I guess? This will totally change the behavior of the UDF if the data 
has been processed twice. I also attached main and UDF classes, as well as the 
logging file for additional info.

 

DDL

 
{code:java}
public static void main(String[] args) throws Exception {
        EnvironmentSettings settings = 
EnvironmentSettings.newInstance().build();
        
        TableEnvironment tEnv = TableEnvironment.create(settings);
        
        tEnv.executeSql("CREATE FUNCTION IntInputUdf AS 
'org.apache.flink.playgrounds.spendreport.IntInputUdf'");        
tEnv.executeSql("CREATE TABLE datagenTable (\n" +
                "    id  INT\n" +
                ") WITH (\n" +
                "    'connector' = 'datagen',\n" +
                "    'number-of-rows' = '100',\n" +
                "    'rows-per-second' = '1'\n" +
                ")");        tEnv.executeSql("CREATE TABLE print_table (\n" +
                "    id_in_bytes  VARBINARY,\n" +
                "    id  INT\n" +
                ") WITH (\n" +
                "    'connector' = 'print'\n" +
                ")");        tEnv.executeSql("INSERT INTO print_table SELECT * 
FROM ( SELECT IntInputUdf(`id`) AS `id_in_bytes`, `id` FROM datagenTable ) AS 
ET WHERE ET.`id_in_bytes` IS NOT NULL");
    }  {code}
 

UDF

 
{code:java}
public @DataTypeHint("Bytes") byte[] eval(@DataTypeHint("INT") Integer 
intputNum) {
    byte[] results = intputNum.toString().getBytes(StandardCharsets.UTF_8);
    if (intputNum % 2 == 0) {
      LOG.info("### ### input bytes {} and num {}.   ### ### DEBUG ### ### 
duplicated call??? ### DEBUG  ### ### ", results, intputNum);
      return results;
    }
    LOG.info("*** *** input bytes {} and num {}.", results, intputNum);
    return null;
  } {code}
output

 

 
{code:java}
2022-11-02 13:38:56,765 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### input 
bytes [45, 49, 51, 50, 52, 56, 51, 54, 53, 48, 50] and num -1324836502.   ### 
### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 2022-11-02 13:38:56,766 
INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### 
input bytes [45, 49, 51, 50, 52, 56, 51, 54, 53, 48, 50] and num -1324836502.   
### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 2022-11-02 
13:38:57,761 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         
[] - ### ### input bytes [49, 48, 56, 53, 52, 53, 54, 53, 52, 50] and num 
1085456542.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
2022-11-02 13:38:57,763 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### input 
bytes [49, 48, 56, 53, 52, 53, 54, 53, 52, 50] and num 1085456542.   ### ### 
DEBUG ### ### duplicated call??? ### DEBUG  ### ### 2022-11-02 13:38:58,760 
INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### 
input bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num 1506311954.   ### 
### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 2022-11-02 13:38:58,761 
INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### 
input bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num 1506311954.   ### 
### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 2022-11-02 13:38:59,759 
INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** 
input bytes [45, 49, 56, 48, 48, 54, 57, 48, 52, 51, 55] and num 
-1800690437.2022-11-02 13:39:00,761 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** input 
bytes [49, 52, 50, 56, 56, 55, 55, 52, 56, 51] and num 1428877483.2022-11-02 
13:39:01,761 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         
[] - ### ### input bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num 
-1794263686.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
2022-11-02 13:39:01,761 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### input 
bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num -1794263686.   ### 
### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 2022-11-02 13:39:02,760 
INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### 
input bytes [45, 49, 49, 54, 54, 56, 57, 56, 53, 52, 50] and num -1166898542.   
### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 2022-11-02 
13:39:02,762 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         
[] - ### ### input bytes [45, 49, 49, 54, 54, 56, 57, 56, 53, 52, 50] and num 
-1166898542.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
2022-11-02 13:39:03,758 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** input 
bytes [45, 49, 54, 54, 51, 53, 49, 53, 55, 53, 51] and num 
-1663515753.2022-11-02 13:39:04,760 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** input 
bytes [45, 52, 53, 53, 51, 52, 52, 50, 57] and num -45534429.2022-11-02 
13:39:05,760 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         
[] - *** *** input bytes [49, 50, 55, 48, 55, 50, 52, 52, 57] and num 
127072449.2022-11-02 13:39:06,760 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** input 
bytes [45, 52, 53, 51, 55, 48, 53, 54, 48, 55] and num -453705607.2022-11-02 
13:39:07,760 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         
[] - ### ### input bytes [45, 49, 48, 57, 53, 57, 48, 56, 51, 50, 54] and num 
-1095908326.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
2022-11-02 13:39:07,763 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### input 
bytes [45, 49, 48, 57, 53, 57, 48, 56, 51, 50, 54] and num -1095908326.   ### 
### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 2022-11-02 13:39:08,760 
INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** 
input bytes [45, 49, 54, 50, 55, 53, 57, 55, 52, 49, 55] and num 
-1627597417.2022-11-02 13:39:09,761 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** input 
bytes [53, 57, 54, 53, 50, 48, 53, 48, 49] and num 596520501.2022-11-02 
13:39:10,761 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         
[] - *** *** input bytes [49, 51, 54, 49, 49, 54, 50, 56, 52, 51] and num 
1361162843.2022-11-02 13:39:11,759 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** input 
bytes [50, 48, 52, 56, 48, 53, 49, 55, 57, 49] and num 2048051791.2022-11-02 
13:39:12,759 INFO  org.apache.flink.playgrounds.spendreport.IntInputUdf         
[] - *** *** input bytes [45, 51, 48, 54, 54, 48, 51, 56, 51, 53] and num 
-306603835. {code}
 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to