[ 
https://issues.apache.org/jira/browse/FLINK-21573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17347654#comment-17347654
 ] 

Qishang Zhong commented on FLINK-21573:
---------------------------------------

Hi [~lzljs3620320] . In this case, UDF `split_str` will be called twice.
{code:java}
//代码占位符
public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
    StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
    DataStream<Order> orderA =
            env.fromCollection(
                    Arrays.asList(
                            new Order(1L, "beer", 3)));

    Table tableA = tEnv.fromDataStream(orderA, $("user"), $("product"), 
$("amount"));
    tEnv.createTemporaryFunction("split_str", new SubstringFunction());

    Table table =
            tEnv.sqlQuery("SELECT user, product, amount, d, UPPER(d) as e\n" +
                    "FROM (\n" +
                    "  SELECT split_str(product,1,2) as d, *\n" +
                    "  FROM " + tableA + "\n" +
                    ") T");
    tEnv.toAppendStream(table, Row.class).print();
    env.execute();
}

public static class Order {
    public Long user;
    public String product;
    public int amount;

    public Order() {
    }

    public Order(Long user, String product, int amount) {
        this.user = user;
        this.product = product;
        this.amount = amount;
    }
}

public static class SubstringFunction extends ScalarFunction {
    public String eval(String s, Integer begin, Integer end) {
        System.out.println(s);
        return s.substring(begin, end);
    }
}
{code}
I have some ETL jobs need to connect jdbc or RPC calls in the UDF. It will have 
a big effect.

> Support expression reuse in codegen
> -----------------------------------
>
>                 Key: FLINK-21573
>                 URL: https://issues.apache.org/jira/browse/FLINK-21573
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Runtime
>            Reporter: Benchao Li
>            Priority: Major
>
> Currently there is no expression reuse in codegen, and this may result in 
> more CPU overhead in some cases. E.g.
> {code:java}
> SELECT my_map['key1'] as key1, my_map['key2'] as key2, my_map['key3'] as key3
> FROM (
>   SELECT dump_json_to_map(col1) as my_map
>   FROM T
> )
> {code}
> `dump_json_to_map` will be called 3 times.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to