The dategen may produce rows with same values. 

>From my side, in Flink, the udf shouldn't process one row for twice, 
>otherwise, it should be a critical bug. 

Best regards, 
Yuxia 


发件人: "Xinyi Yan" <yanxi...@apache.org> 
收件人: "User" <user@flink.apache.org> 
发送时间: 星期四, 2022年 11 月 03日 上午 6:59:20 
主题: Question about UDF randomly processed input row twice 

Hi all, 
I found a weird UDF behavior, and it's a single thread that processes UDF 
twice, see [ https://issues.apache.org/jira/browse/FLINK-29855 | FLINK-29855 ] 
for more details. Basically, I created a datagen table with a random integer (1 
row per second) and passed this value into the UDF. Inside UDF, I just simply 
mod the input number, convert the integer to a byte array, and then logged it 
for debugging purposes. As you can see, some of the rows have been called twice 
inside UDF. Not sure if this duplicated UDF call is expected, and not sure why 
it doesn't constantly produce duplicated calls for all rows. In any case of 
concern about the local env setup, I only have 1 task manager and 1 task slot 
in my local Flink cluster. 

Thanks! 

UDF 
public @DataTypeHint( "Bytes" ) byte [] eval(@DataTypeHint( "INT" ) Integer 
intputNum) { byte [] results = 
intputNum.toString().getBytes(StandardCharsets.UTF_8); if (intputNum % 2 == 0) {
      LOG.info( "### ### input bytes {} and num {}.   ### ### DEBUG ### ### 
duplicated call??? ### DEBUG  ### ### " , results, intputNum); return results;
    }
    LOG.info( "*** *** input bytes {} and num {}." , results, intputNum); 
return null ;
  } 

Main class DDLs 
tEnv.executeSql( "CREATE FUNCTION IntInputUdf AS 
'org.apache.flink.playgrounds.spendreport.IntInputUdf' " );        
tEnv.executeSql( "CREATE TABLE datagenTable (\n" + "    id  INT\n" + ") WITH 
(\n" + " 'connector' = 'datagen' ,\n" + " 'number-of-rows' = '100' ,\n" + " 
'rows-per-second' = '1' \n" + ")" );        
tEnv.executeSql( "CREATE TABLE print_table (\n" + "    id_in_bytes  
VARBINARY,\n" + "    id  INT\n" + ") WITH (\n" + " 'connector' = 'print' \n" + 
")" );        
tEnv.executeSql( "INSERT INTO print_table SELECT * FROM ( SELECT 
IntInputUdf(`id`) AS `id_in_bytes`, `id` FROM datagenTable ) AS ET WHERE 
ET.`id_in_bytes` IS NOT NULL" ); 

Logging 
2022-11-02 13:38:58,760 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### input 
bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num 1506311954 .   ### ### 
DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
2022-11-02 13:38:58,761 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### input 
bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num 1506311954 .   ### ### 
DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
2022-11-02 13:38:59,759 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** input 
bytes [45, 49, 56, 48, 48, 54, 57, 48, 52, 51, 55] and num -1800690437 .
2022-11-02 13:39:00,761 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** input 
bytes [49, 52, 50, 56, 56, 55, 55, 52, 56, 51] and num 1428877483 .
2022-11-02 13:39:01,761 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### input 
bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num -1794263686 .   ### 
### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
2022-11-02 13:39:01,761 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### input 
bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num -1794263686 .   ### 
### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 

Reply via email to