Hi all,
I found a weird UDF behavior, and it's a single thread that processes UDF
twice, see FLINK-29855 <https://issues.apache.org/jira/browse/FLINK-29855> for
more details. Basically, I created a datagen table with a random integer (1
row per second) and passed this value into the UDF. Inside UDF, I just
simply mod the input number, convert the integer to a byte array, and then
logged it for debugging purposes. As you can see, some of the rows have
been called twice inside UDF. Not sure if this duplicated UDF call is
expected, and not sure why it doesn't constantly produce duplicated calls
for all rows. In any case of concern about the local env setup, I only have
1 task manager and 1 task slot in my local Flink cluster.

Thanks!

UDF

public @DataTypeHint("Bytes") byte[] eval(@DataTypeHint("INT") Integer
intputNum) {
    byte[] results = intputNum.toString().getBytes(StandardCharsets.UTF_8);
    if (intputNum % 2 == 0) {
      LOG.info("### ### input bytes {} and num {}.   ### ### DEBUG ###
### duplicated call??? ### DEBUG  ### ### ", results, intputNum);
      return results;
    }
    LOG.info("*** *** input bytes {} and num {}.", results, intputNum);
    return null;
  }


Main class DDLs

tEnv.executeSql("CREATE FUNCTION IntInputUdf AS
'org.apache.flink.playgrounds.spendreport.IntInputUdf'");
tEnv.executeSql("CREATE TABLE datagenTable (\n" +
                "    id  INT\n" +
                ") WITH (\n" +
                "    'connector' = 'datagen',\n" +
                "    'number-of-rows' = '100',\n" +
                "    'rows-per-second' = '1'\n" +
                ")");
tEnv.executeSql("CREATE TABLE print_table (\n" +
                "    id_in_bytes  VARBINARY,\n" +
                "    id  INT\n" +
                ") WITH (\n" +
                "    'connector' = 'print'\n" +
                ")");
tEnv.executeSql("INSERT INTO print_table SELECT * FROM ( SELECT
IntInputUdf(`id`) AS `id_in_bytes`, `id` FROM datagenTable ) AS ET
WHERE ET.`id_in_bytes` IS NOT NULL");


Logging

2022-11-02 13:38:58,760 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ###
### input bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num
*1506311954*.   ### ### DEBUG ### ### duplicated call??? ### DEBUG
### ###
2022-11-02 13:38:58,761 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ###
### input bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num
*1506311954*.   ### ### DEBUG ### ### duplicated call??? ### DEBUG
### ###
2022-11-02 13:38:59,759 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ***
*** input bytes [45, 49, 56, 48, 48, 54, 57, 48, 52, 51, 55] and num
*-1800690437*.
2022-11-02 13:39:00,761 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ***
*** input bytes [49, 52, 50, 56, 56, 55, 55, 52, 56, 51] and num
*1428877483*.
2022-11-02 13:39:01,761 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ###
### input bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num
*-1794263686*.   ### ### DEBUG ### ### duplicated call??? ### DEBUG
### ###
2022-11-02 13:39:01,761 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ###
### input bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num
*-1794263686*.   ### ### DEBUG ### ### duplicated call??? ### DEBUG
### ###

Reply via email to