Thanks for your explanation. The execute plan for the sql `INSERT INTO 
print_table SELECT * FROM ( SELECT RandomUdf(`id`) AS `id_in_bytes`, `id` FROM 
datagenTable ) AS ET WHERE ET.`id_in_bytes` IS NOT NULL` is : 
` 
StreamPhysicalSink(table=[default_catalog.default_database.print_table], 
fields=[id_in_bytes, id]) 
StreamPhysicalCalc(select=[RandomUdf(id) AS id_in_bytes, id], where=[IS NOT 
NULL(RandomUdf(id))]) 
StreamPhysicalTableSourceScan(table=[[default_catalog, default_database, 
datagenTable]], fields=[id]) 
` 
and from the plan, we can see it'll call the udf for twice in the 
StreamPhysicalCalc, as of result of which, it seems the one row will be 
processed for twice. 

Best regards, 
Yuxia 


发件人: "Xinyi Yan" <yanxi...@apache.org> 
收件人: "yuxia" <luoyu...@alumni.sjtu.edu.cn> 
抄送: "User" <user@flink.apache.org> 
发送时间: 星期五, 2022年 11 月 04日 上午 5:28:30 
主题: Re: Question about UDF randomly processed input row twice 

Ok. The datagen with sequence option can produce this issue easily, and it also 
resulted in an incorrect result. I have a sequence generated by datagen that 
starts from 1 to 5 and let the UDF randomly either return null or bytes. 
Surprisingly, not only the UDF has been executed twice but also the where 
clause did not handle the ` IS NOT NULL `. This is a big shock from my side, 
the where clause `IS NOT NULL` condition is a fundamental SQL feature and it 
should not break. I have updated my finding in [ 
https://issues.apache.org/jira/browse/FLINK-29855 | FLINK-29855 ] , and here 
are the repro steps: 

Query: 
INSERT INTO print_table 
     SELECT * FROM ( 
           SELECT RandomUdf(`id`) AS `id_in_bytes`, `id` FROM datagenTable 
     ) 
AS ET WHERE ET.`id_in_bytes` IS NOT NULL " 

Result: 
+I[ null , 1] 
+I[[50], 2] 
+I[ null , 4] 


UDF 
public @DataTypeHint( "Bytes" ) byte [] eval(@DataTypeHint( "INT" ) Integer 
intputNum) { byte [] results = 
intputNum.toString().getBytes(StandardCharsets.UTF_8); int randomNumber = (( 
int ) ( Math .random() * (10 - 1))) + 1;
    LOG.info( "[*][*][*] input num is {} and random number is {}. [*][*][*]" , 
intputNum, randomNumber); if (randomNumber % 2 == 0) {
      LOG.info( "### ### input bytes {} and num {}.   ### ### DEBUG ### ### 
duplicated call??? ### DEBUG  ### ### " , results, intputNum); return results;
    }
    LOG.info( "*** *** input bytes {} and num {}." , results, intputNum); 
return null ;
  } 

Log: 
2022-11-02 13:38:56,765 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### input 
bytes [45, 49, 51, 50, 52, 56, 51, 54, 53, 48, 50] and num -1324836502.   ### 
### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
2022-11-02 13:38:56,766 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### input 
bytes [45, 49, 51, 50, 52, 56, 51, 54, 53, 48, 50] and num -1324836502.   ### 
### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
2022-11-02 13:38:57,761 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### input 
bytes [49, 48, 56, 53, 52, 53, 54, 53, 52, 50] and num 1085456542.   ### ### 
DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
2022-11-02 13:38:57,763 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### input 
bytes [49, 48, 56, 53, 52, 53, 54, 53, 52, 50] and num 1085456542.   ### ### 
DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
2022-11-02 13:38:58,760 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### input 
bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num 1506311954.   ### ### 
DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
2022-11-02 13:38:58,761 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### input 
bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num 1506311954.   ### ### 
DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
2022-11-02 13:38:59,759 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** input 
bytes [45, 49, 56, 48, 48, 54, 57, 48, 52, 51, 55] and num -1800690437.
2022-11-02 13:39:00,761 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** input 
bytes [49, 52, 50, 56, 56, 55, 55, 52, 56, 51] and num 1428877483.
2022-11-02 13:39:01,761 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### input 
bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num -1794263686.   ### 
### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
2022-11-02 13:39:01,761 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### input 
bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num -1794263686.   ### 
### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
2022-11-02 13:39:02,760 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### input 
bytes [45, 49, 49, 54, 54, 56, 57, 56, 53, 52, 50] and num -1166898542.   ### 
### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
2022-11-02 13:39:02,762 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### input 
bytes [45, 49, 49, 54, 54, 56, 57, 56, 53, 52, 50] and num -1166898542.   ### 
### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
2022-11-02 13:39:03,758 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** input 
bytes [45, 49, 54, 54, 51, 53, 49, 53, 55, 53, 51] and num -1663515753.
2022-11-02 13:39:04,760 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** input 
bytes [45, 52, 53, 53, 51, 52, 52, 50, 57] and num -45534429.
2022-11-02 13:39:05,760 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** input 
bytes [49, 50, 55, 48, 55, 50, 52, 52, 57] and num 127072449.
2022-11-02 13:39:06,760 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** input 
bytes [45, 52, 53, 51, 55, 48, 53, 54, 48, 55] and num -453705607.
2022-11-02 13:39:07,760 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### input 
bytes [45, 49, 48, 57, 53, 57, 48, 56, 51, 50, 54] and num -1095908326.   ### 
### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
2022-11-02 13:39:07,763 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### input 
bytes [45, 49, 48, 57, 53, 57, 48, 56, 51, 50, 54] and num -1095908326.   ### 
### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
2022-11-02 13:39:08,760 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** input 
bytes [45, 49, 54, 50, 55, 53, 57, 55, 52, 49, 55] and num -1627597417.
2022-11-02 13:39:09,761 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** input 
bytes [53, 57, 54, 53, 50, 48, 53, 48, 49] and num 596520501.
2022-11-02 13:39:10,761 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** input 
bytes [49, 51, 54, 49, 49, 54, 50, 56, 52, 51] and num 1361162843.
2022-11-02 13:39:11,759 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** input 
bytes [50, 48, 52, 56, 48, 53, 49, 55, 57, 49] and num 2048051791.
2022-11-02 13:39:12,759 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** input 
bytes [45, 51, 48, 54, 54, 48, 51, 56, 51, 53] and num -306603835. 

On Thu, Nov 3, 2022 at 3:04 AM yuxia < [ mailto:luoyu...@alumni.sjtu.edu.cn | 
luoyu...@alumni.sjtu.edu.cn ] > wrote: 



The dategen may produce rows with same values. 

>From my side, in Flink, the udf shouldn't process one row for twice, 
>otherwise, it should be a critical bug. 

Best regards, 
Yuxia 


发件人: "Xinyi Yan" < [ mailto:yanxi...@apache.org | yanxi...@apache.org ] > 
收件人: "User" < [ mailto:user@flink.apache.org | user@flink.apache.org ] > 
发送时间: 星期四, 2022年 11 月 03日 上午 6:59:20 
主题: Question about UDF randomly processed input row twice 

Hi all, 
I found a weird UDF behavior, and it's a single thread that processes UDF 
twice, see [ https://issues.apache.org/jira/browse/FLINK-29855 | FLINK-29855 ] 
for more details. Basically, I created a datagen table with a random integer (1 
row per second) and passed this value into the UDF. Inside UDF, I just simply 
mod the input number, convert the integer to a byte array, and then logged it 
for debugging purposes. As you can see, some of the rows have been called twice 
inside UDF. Not sure if this duplicated UDF call is expected, and not sure why 
it doesn't constantly produce duplicated calls for all rows. In any case of 
concern about the local env setup, I only have 1 task manager and 1 task slot 
in my local Flink cluster. 

Thanks! 

UDF 
public @DataTypeHint( "Bytes" ) byte [] eval(@DataTypeHint( "INT" ) Integer 
intputNum) { byte [] results = 
intputNum.toString().getBytes(StandardCharsets.UTF_8); if (intputNum % 2 == 0) {
      LOG.info( "### ### input bytes {} and num {}.   ### ### DEBUG ### ### 
duplicated call??? ### DEBUG  ### ### " , results, intputNum); return results;
    }
    LOG.info( "*** *** input bytes {} and num {}." , results, intputNum); 
return null ;
  } 

Main class DDLs 
tEnv.executeSql( "CREATE FUNCTION IntInputUdf AS 
'org.apache.flink.playgrounds.spendreport.IntInputUdf' " );        
tEnv.executeSql( "CREATE TABLE datagenTable (\n" + "    id  INT\n" + ") WITH 
(\n" + " 'connector' = 'datagen' ,\n" + " 'number-of-rows' = '100' ,\n" + " 
'rows-per-second' = '1' \n" + ")" );        
tEnv.executeSql( "CREATE TABLE print_table (\n" + "    id_in_bytes  
VARBINARY,\n" + "    id  INT\n" + ") WITH (\n" + " 'connector' = 'print' \n" + 
")" );        
tEnv.executeSql( "INSERT INTO print_table SELECT * FROM ( SELECT 
IntInputUdf(`id`) AS `id_in_bytes`, `id` FROM datagenTable ) AS ET WHERE 
ET.`id_in_bytes` IS NOT NULL" ); 

Logging 
2022-11-02 13:38:58,760 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### input 
bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num 1506311954 .   ### ### 
DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
2022-11-02 13:38:58,761 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### input 
bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num 1506311954 .   ### ### 
DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
2022-11-02 13:38:59,759 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** input 
bytes [45, 49, 56, 48, 48, 54, 57, 48, 52, 51, 55] and num -1800690437 .
2022-11-02 13:39:00,761 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** input 
bytes [49, 52, 50, 56, 56, 55, 55, 52, 56, 51] and num 1428877483 .
2022-11-02 13:39:01,761 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### input 
bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num -1794263686 .   ### 
### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
2022-11-02 13:39:01,761 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### input 
bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num -1794263686 .   ### 
### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 




Reply via email to