[ https://issues.apache.org/jira/browse/FLINK-29855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17628431#comment-17628431 ]
Xinyi Yan commented on FLINK-29855: ----------------------------------- [~lsy], thanks for the reply. I don't think the datagen produces the duplicated rows that often, especially 1 row per second already defined as part of the datagen config. In addition to [~rovo98] sequence experiment, it's a clear signal that UDF has been randomly processed twice. > UDF randomly processed input data twice > ---------------------------------------- > > Key: FLINK-29855 > URL: https://issues.apache.org/jira/browse/FLINK-29855 > Project: Flink > Issue Type: Bug > Affects Versions: 1.14.4 > Reporter: Xinyi Yan > Priority: Critical > Attachments: IntInputUdf.java, SpendReport.java, example.log > > > Local flink cluster env: > 1 task manager and 1 task slot. > To reproduce the issue: > # create a datagen table with a single column int type of id with 1 row per > second. > # create a UDF that only mod input data with logging statements. > # create a print table that prints the results. > # insert data into the print table with UDF(input id column) execution from > the datagen table. > The logging shows that some of the data have been processed twice, which is > not expected I guess? This will totally change the behavior of the UDF if the > data has been processed twice. I also attached main and UDF classes, as well > as the logging file for additional info. > > DDL > > {code:java} > public static void main(String[] args) throws Exception { > EnvironmentSettings settings = > EnvironmentSettings.newInstance().build(); > > TableEnvironment tEnv = TableEnvironment.create(settings); > > tEnv.executeSql("CREATE FUNCTION IntInputUdf AS > 'org.apache.flink.playgrounds.spendreport.IntInputUdf'"); > tEnv.executeSql("CREATE TABLE datagenTable (\n" + > " id INT\n" + > ") WITH (\n" + > " 'connector' = 'datagen',\n" + > " 'number-of-rows' = '100',\n" + > " 'rows-per-second' = '1'\n" + > ")"); > tEnv.executeSql("CREATE TABLE print_table (\n" + > " id_in_bytes VARBINARY,\n" + > " id INT\n" + > ") WITH (\n" + > " 'connector' = 'print'\n" + > ")"); > tEnv.executeSql("INSERT INTO print_table SELECT * FROM ( SELECT > IntInputUdf(`id`) AS `id_in_bytes`, `id` FROM datagenTable ) AS ET WHERE > ET.`id_in_bytes` IS NOT NULL"); > } {code} > > UDF > > {code:java} > public @DataTypeHint("Bytes") byte[] eval(@DataTypeHint("INT") Integer > intputNum) { > byte[] results = intputNum.toString().getBytes(StandardCharsets.UTF_8); > if (intputNum % 2 == 0) { > LOG.info("### ### input bytes {} and num {}. ### ### DEBUG ### ### > duplicated call??? ### DEBUG ### ### ", results, intputNum); > return results; > } > LOG.info("*** *** input bytes {} and num {}.", results, intputNum); > return null; > } {code} > output > > > {code:java} > 2022-11-02 13:38:56,765 INFO > org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### > input bytes [45, 49, 51, 50, 52, 56, 51, 54, 53, 48, 50] and num -1324836502. > ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ### > 2022-11-02 13:38:56,766 INFO > org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### > input bytes [45, 49, 51, 50, 52, 56, 51, 54, 53, 48, 50] and num -1324836502. > ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ### > 2022-11-02 13:38:57,761 INFO > org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### > input bytes [49, 48, 56, 53, 52, 53, 54, 53, 52, 50] and num 1085456542. > ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ### > 2022-11-02 13:38:57,763 INFO > org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### > input bytes [49, 48, 56, 53, 52, 53, 54, 53, 52, 50] and num 1085456542. > ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ### > 2022-11-02 13:38:58,760 INFO > org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### > input bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num 1506311954. > ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ### > 2022-11-02 13:38:58,761 INFO > org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### > input bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num 1506311954. > ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ### > 2022-11-02 13:38:59,759 INFO > org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** > input bytes [45, 49, 56, 48, 48, 54, 57, 48, 52, 51, 55] and num -1800690437. > 2022-11-02 13:39:00,761 INFO > org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** > input bytes [49, 52, 50, 56, 56, 55, 55, 52, 56, 51] and num 1428877483. > 2022-11-02 13:39:01,761 INFO > org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### > input bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num -1794263686. > ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ### > 2022-11-02 13:39:01,761 INFO > org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### > input bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num -1794263686. > ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ### > 2022-11-02 13:39:02,760 INFO > org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### > input bytes [45, 49, 49, 54, 54, 56, 57, 56, 53, 52, 50] and num -1166898542. > ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ### > 2022-11-02 13:39:02,762 INFO > org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### > input bytes [45, 49, 49, 54, 54, 56, 57, 56, 53, 52, 50] and num -1166898542. > ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ### > 2022-11-02 13:39:03,758 INFO > org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** > input bytes [45, 49, 54, 54, 51, 53, 49, 53, 55, 53, 51] and num -1663515753. > 2022-11-02 13:39:04,760 INFO > org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** > input bytes [45, 52, 53, 53, 51, 52, 52, 50, 57] and num -45534429. > 2022-11-02 13:39:05,760 INFO > org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** > input bytes [49, 50, 55, 48, 55, 50, 52, 52, 57] and num 127072449. > 2022-11-02 13:39:06,760 INFO > org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** > input bytes [45, 52, 53, 51, 55, 48, 53, 54, 48, 55] and num -453705607. > 2022-11-02 13:39:07,760 INFO > org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### > input bytes [45, 49, 48, 57, 53, 57, 48, 56, 51, 50, 54] and num -1095908326. > ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ### > 2022-11-02 13:39:07,763 INFO > org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### > input bytes [45, 49, 48, 57, 53, 57, 48, 56, 51, 50, 54] and num -1095908326. > ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ### > 2022-11-02 13:39:08,760 INFO > org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** > input bytes [45, 49, 54, 50, 55, 53, 57, 55, 52, 49, 55] and num -1627597417. > 2022-11-02 13:39:09,761 INFO > org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** > input bytes [53, 57, 54, 53, 50, 48, 53, 48, 49] and num 596520501. > 2022-11-02 13:39:10,761 INFO > org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** > input bytes [49, 51, 54, 49, 49, 54, 50, 56, 52, 51] and num 1361162843. > 2022-11-02 13:39:11,759 INFO > org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** > input bytes [50, 48, 52, 56, 48, 53, 49, 55, 57, 49] and num 2048051791. > 2022-11-02 13:39:12,759 INFO > org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** > input bytes [45, 51, 48, 54, 54, 48, 51, 56, 51, 53] and num -306603835. > {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)