[ 
https://issues.apache.org/jira/browse/FLINK-22994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lynn1.zhang updated FLINK-22994:
--------------------------------
    Description: 
h1. BackGround

In some nesting udf invoking cases, Flink convert the udf result to external 
object and then convert to internalOrNull object as params for next udf 
invoking  
h1. Performance Compare with MapMapConverter & without MapMapConverter

Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 CPU 
@ 2.30GHz
 UDF Introduction:
 * ipip:  input: int ip, output: map ip_info, map size = 14.
 * ip_2_country: input map ip_info, output: string country.
 * ip_2_region: input  map ip_info, output: string region.
 * ip_2_isp_domain: input  map ip_info, output: string isp.
 * ip_2_timezone: input map ip_info, output: string timezone.

h5. The throughput without udf invoke: 764.50 k/s

!image-2021-06-15-15-27-26-739.png!
h5. The throughput with udf invoke: 183.24 k/s

!image-2021-06-15-15-42-08-065.png!
h5. The throughput with udf nesting invoke: 41.42 k/s

!image-2021-06-15-15-29-09-773.png!
h5. The throughput with udf nesting invoke after this issue: 174.41 k/s

!image-2021-06-15-15-30-14-775.png!

 

  was:
h1. BackGround

Flink maintain the udf result as BinaryData, like BinaryStringData. When 
invoking nesting udf like select useless(int_ip_2_string(ip)), the result of 
int_ip_2_string(ip) will be toInternalOrNull and toExternal.

Below is the Generated Code

!image-2021-06-15-15-18-12-619.png!   This issue will improve it as below

!image-2021-06-15-15-19-01-103.png!
h1. Performance Compare

Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 CPU 
@ 2.30GHz
 UDF Introduction:
 * ipip:  input: int ip, output: map ip_info, map size = 14.
 * ip_2_country: input map ip_info, output: string country.
 * ip_2_region: input  map ip_info, output: string region.
 * ip_2_isp_domain: input  map ip_info, output: string isp.
 * ip_2_timezone: input map ip_info, output: string timezone.

h5. The throughput without udf invoke: 764.50 k/s

!image-2021-06-15-15-27-26-739.png!
h5. The throughput with udf invoke: 183.24 k/s

!image-2021-06-15-15-42-08-065.png!
h5. The throughput with udf nesting invoke: 41.42 k/s

!image-2021-06-15-15-29-09-773.png!
h5. The throughput with udf nesting invoke after this issue: 174.41 k/s

!image-2021-06-15-15-30-14-775.png!

 


> Improve the performance of invoking nesting udf
> -----------------------------------------------
>
>                 Key: FLINK-22994
>                 URL: https://issues.apache.org/jira/browse/FLINK-22994
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Runtime
>    Affects Versions: 1.12.4
>         Environment: h5.  
>            Reporter: lynn1.zhang
>            Assignee: lynn1.zhang
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: StringConverterTest.java, Test.java, 
> image-2021-06-15-15-18-12-619.png, image-2021-06-15-15-19-01-103.png, 
> image-2021-06-15-15-27-26-739.png, image-2021-06-15-15-28-28-137.png, 
> image-2021-06-15-15-29-09-773.png, image-2021-06-15-15-30-14-775.png, 
> image-2021-06-15-15-42-08-065.png, new_projection_code, old_projection_code, 
> test.sql
>
>
> h1. BackGround
> In some nesting udf invoking cases, Flink convert the udf result to external 
> object and then convert to internalOrNull object as params for next udf 
> invoking  
> h1. Performance Compare with MapMapConverter & without MapMapConverter
> Condition: Source = Kafka, Schema = PB with snappy; Flink Slot = 1; 
> taskmanager.memory.process.size=4g; Linux Core = Intel(R) Xeon(R) Gold 5218 
> CPU @ 2.30GHz
>  UDF Introduction:
>  * ipip:  input: int ip, output: map ip_info, map size = 14.
>  * ip_2_country: input map ip_info, output: string country.
>  * ip_2_region: input  map ip_info, output: string region.
>  * ip_2_isp_domain: input  map ip_info, output: string isp.
>  * ip_2_timezone: input map ip_info, output: string timezone.
> h5. The throughput without udf invoke: 764.50 k/s
> !image-2021-06-15-15-27-26-739.png!
> h5. The throughput with udf invoke: 183.24 k/s
> !image-2021-06-15-15-42-08-065.png!
> h5. The throughput with udf nesting invoke: 41.42 k/s
> !image-2021-06-15-15-29-09-773.png!
> h5. The throughput with udf nesting invoke after this issue: 174.41 k/s
> !image-2021-06-15-15-30-14-775.png!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to