The dategen may produce rows with same values. >From my side, in Flink, the udf shouldn't process one row for twice, >otherwise, it should be a critical bug.
Best regards, Yuxia 发件人: "Xinyi Yan" <yanxi...@apache.org> 收件人: "User" <user@flink.apache.org> 发送时间: 星期四, 2022年 11 月 03日 上午 6:59:20 主题: Question about UDF randomly processed input row twice Hi all, I found a weird UDF behavior, and it's a single thread that processes UDF twice, see [ https://issues.apache.org/jira/browse/FLINK-29855 | FLINK-29855 ] for more details. Basically, I created a datagen table with a random integer (1 row per second) and passed this value into the UDF. Inside UDF, I just simply mod the input number, convert the integer to a byte array, and then logged it for debugging purposes. As you can see, some of the rows have been called twice inside UDF. Not sure if this duplicated UDF call is expected, and not sure why it doesn't constantly produce duplicated calls for all rows. In any case of concern about the local env setup, I only have 1 task manager and 1 task slot in my local Flink cluster. Thanks! UDF public @DataTypeHint( "Bytes" ) byte [] eval(@DataTypeHint( "INT" ) Integer intputNum) { byte [] results = intputNum.toString().getBytes(StandardCharsets.UTF_8); if (intputNum % 2 == 0) { LOG.info( "### ### input bytes {} and num {}. ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ### " , results, intputNum); return results; } LOG.info( "*** *** input bytes {} and num {}." , results, intputNum); return null ; } Main class DDLs tEnv.executeSql( "CREATE FUNCTION IntInputUdf AS 'org.apache.flink.playgrounds.spendreport.IntInputUdf' " ); tEnv.executeSql( "CREATE TABLE datagenTable (\n" + " id INT\n" + ") WITH (\n" + " 'connector' = 'datagen' ,\n" + " 'number-of-rows' = '100' ,\n" + " 'rows-per-second' = '1' \n" + ")" ); tEnv.executeSql( "CREATE TABLE print_table (\n" + " id_in_bytes VARBINARY,\n" + " id INT\n" + ") WITH (\n" + " 'connector' = 'print' \n" + ")" ); tEnv.executeSql( "INSERT INTO print_table SELECT * FROM ( SELECT IntInputUdf(`id`) AS `id_in_bytes`, `id` FROM datagenTable ) AS ET WHERE ET.`id_in_bytes` IS NOT NULL" ); Logging 2022-11-02 13:38:58,760 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num 1506311954 . ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ### 2022-11-02 13:38:58,761 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num 1506311954 . ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ### 2022-11-02 13:38:59,759 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** input bytes [45, 49, 56, 48, 48, 54, 57, 48, 52, 51, 55] and num -1800690437 . 2022-11-02 13:39:00,761 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** input bytes [49, 52, 50, 56, 56, 55, 55, 52, 56, 51] and num 1428877483 . 2022-11-02 13:39:01,761 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num -1794263686 . ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ### 2022-11-02 13:39:01,761 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num -1794263686 . ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ###