Xinyi Yan created FLINK-29855: --------------------------------- Summary: UDF randomly processed input data twice Key: FLINK-29855 URL: https://issues.apache.org/jira/browse/FLINK-29855 Project: Flink Issue Type: Bug Affects Versions: 1.14.4 Reporter: Xinyi Yan Attachments: IntInputUdf.java, SpendReport.java, example.log
To reproduce the issue: # create a datagen table with a single column int type of id. # create a UDF that only mod input data with logging statements. # create a print table that prints the results. # insert data into the print table with UDF(input id column) execution from the datagen table. The logging shows that some of the data have been processed twice, which is not expected I guess? This will totally change the behavior of the UDF if the data has been processed twice. I also attached main and UDF classes, as well as the logging file for additional info. DDL {code:java} public static void main(String[] args) throws Exception { EnvironmentSettings settings = EnvironmentSettings.newInstance().build(); TableEnvironment tEnv = TableEnvironment.create(settings); tEnv.executeSql("CREATE FUNCTION IntInputUdf AS 'org.apache.flink.playgrounds.spendreport.IntInputUdf'"); tEnv.executeSql("CREATE TABLE datagenTable (\n" + " id INT\n" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'number-of-rows' = '100',\n" + " 'rows-per-second' = '1'\n" + ")"); tEnv.executeSql("CREATE TABLE print_table (\n" + " id_in_bytes VARBINARY,\n" + " id INT\n" + ") WITH (\n" + " 'connector' = 'print'\n" + ")"); tEnv.executeSql("INSERT INTO print_table SELECT * FROM ( SELECT IntInputUdf(`id`) AS `id_in_bytes`, `id` FROM datagenTable ) AS ET WHERE ET.`id_in_bytes` IS NOT NULL"); } {code} UDF {code:java} public @DataTypeHint("Bytes") byte[] eval(@DataTypeHint("INT") Integer intputNum) { byte[] results = intputNum.toString().getBytes(StandardCharsets.UTF_8); if (intputNum % 2 == 0) { LOG.info("### ### input bytes {} and num {}. ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ### ", results, intputNum); return results; } LOG.info("*** *** input bytes {} and num {}.", results, intputNum); return null; } {code} output {code:java} 2022-11-02 13:38:56,765 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [45, 49, 51, 50, 52, 56, 51, 54, 53, 48, 50] and num -1324836502. ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ### 2022-11-02 13:38:56,766 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [45, 49, 51, 50, 52, 56, 51, 54, 53, 48, 50] and num -1324836502. ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ### 2022-11-02 13:38:57,761 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [49, 48, 56, 53, 52, 53, 54, 53, 52, 50] and num 1085456542. ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ### 2022-11-02 13:38:57,763 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [49, 48, 56, 53, 52, 53, 54, 53, 52, 50] and num 1085456542. ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ### 2022-11-02 13:38:58,760 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num 1506311954. ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ### 2022-11-02 13:38:58,761 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num 1506311954. ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ### 2022-11-02 13:38:59,759 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** input bytes [45, 49, 56, 48, 48, 54, 57, 48, 52, 51, 55] and num -1800690437.2022-11-02 13:39:00,761 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** input bytes [49, 52, 50, 56, 56, 55, 55, 52, 56, 51] and num 1428877483.2022-11-02 13:39:01,761 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num -1794263686. ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ### 2022-11-02 13:39:01,761 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num -1794263686. ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ### 2022-11-02 13:39:02,760 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [45, 49, 49, 54, 54, 56, 57, 56, 53, 52, 50] and num -1166898542. ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ### 2022-11-02 13:39:02,762 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [45, 49, 49, 54, 54, 56, 57, 56, 53, 52, 50] and num -1166898542. ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ### 2022-11-02 13:39:03,758 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** input bytes [45, 49, 54, 54, 51, 53, 49, 53, 55, 53, 51] and num -1663515753.2022-11-02 13:39:04,760 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** input bytes [45, 52, 53, 53, 51, 52, 52, 50, 57] and num -45534429.2022-11-02 13:39:05,760 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** input bytes [49, 50, 55, 48, 55, 50, 52, 52, 57] and num 127072449.2022-11-02 13:39:06,760 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** input bytes [45, 52, 53, 51, 55, 48, 53, 54, 48, 55] and num -453705607.2022-11-02 13:39:07,760 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [45, 49, 48, 57, 53, 57, 48, 56, 51, 50, 54] and num -1095908326. ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ### 2022-11-02 13:39:07,763 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [45, 49, 48, 57, 53, 57, 48, 56, 51, 50, 54] and num -1095908326. ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ### 2022-11-02 13:39:08,760 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** input bytes [45, 49, 54, 50, 55, 53, 57, 55, 52, 49, 55] and num -1627597417.2022-11-02 13:39:09,761 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** input bytes [53, 57, 54, 53, 50, 48, 53, 48, 49] and num 596520501.2022-11-02 13:39:10,761 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** input bytes [49, 51, 54, 49, 49, 54, 50, 56, 52, 51] and num 1361162843.2022-11-02 13:39:11,759 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** input bytes [50, 48, 52, 56, 48, 53, 49, 55, 57, 49] and num 2048051791.2022-11-02 13:39:12,759 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** input bytes [45, 51, 48, 54, 54, 48, 51, 56, 51, 53] and num -306603835. {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)