[ https://issues.apache.org/jira/browse/FLINK-23555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
weibowen updated FLINK-23555: ----------------------------- Description: When we write a sql like {code:java} select udf2(udf1(field), udf3(udf1(field) ...{code} udf1(field) will be invoked twice. However once udf1 has a bad performance, it will have a huge impact to the whole task. More times invoked, huger impact. I hope that whatever how many times udf1(field) writed in sql, Flink will take advantage of common subexpression elimination and only invoke it once. Then i do some work on this, and the attachment tells the result. The sql.png shows the sql logic, and i read source from kafka and sink into blackhole. The parallelism is 1. The udf `testcse` do nothing except sleeping 20 milliseconds, while the udf `testcse2`, `testcse3` and `testcse4` are the same udf with different alias which completely do nothing. As expected, the performance after optimization is approximately 3 times than before since I write `testcse(sid)` 3 times in sql. was: When we write a sql like {code:java} select udf2(udf1(field), udf3(udf1(field) ...{code} udf1(field) will be invoked twice. However once udf1 has a bad performance, it will have a huge impact to the whole task. More times invoked, huger impact. I hope that whatever how many times udf1(field) writed in sql, Flink will take advantage of common subexpression elimination and only invoke it once. Then i do some work on this, and the attachment tells the result. > Improve common subexpression elimination by using local references > ------------------------------------------------------------------ > > Key: FLINK-23555 > URL: https://issues.apache.org/jira/browse/FLINK-23555 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner > Reporter: weibowen > Priority: Major > Fix For: 1.14.0 > > Attachments: performance_after_optimization.png, > performance_before_optimization.png, sql.png, udf.png > > > When we write a sql like > {code:java} > select udf2(udf1(field), udf3(udf1(field) ...{code} > udf1(field) will be invoked twice. However once udf1 has a bad performance, > it will have a huge impact to the whole task. More times invoked, huger > impact. > I hope that whatever how many times udf1(field) writed in sql, Flink will > take advantage of common subexpression elimination and only invoke it once. > Then i do some work on this, and the attachment tells the result. > > The sql.png shows the sql logic, and i read source from kafka and sink into > blackhole. The parallelism is 1. > The udf `testcse` do nothing except sleeping 20 milliseconds, while the udf > `testcse2`, `testcse3` and `testcse4` are the same udf with different alias > which completely do nothing. > As expected, the performance after optimization is approximately 3 times than > before since I write `testcse(sid)` 3 times in sql. -- This message was sent by Atlassian Jira (v8.3.4#803005)