[ 
https://issues.apache.org/jira/browse/FLINK-31059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17691464#comment-17691464
 ] 

miamiaoxyz commented on FLINK-31059:
------------------------------------

I first use sql client to test the feature, but the 
`exec.hive.native-agg-function.enabled`  do not work. 


I then use ITCase to verify.
 #    I use IT case to turn on and off `exec.hive.native-agg-function.enabled` 
to verify the two results are the same, testSql function test whether the same 
sql get same result with on and off `exec.hive.native-agg-function.enabled`.

I verfied that the plan use Hashagg when turn on the 
`exec.hive.native-agg-function.enabled`, and the plan use SortAgg when turn off 
by IT case.
!image-2023-02-21-15-45-48-226.png|width=549,height=234!
It pass all the IT Case below. 
!image-2023-02-21-15-46-13-966.png|width=501,height=371!
2. I verified that data results are the same when combine sum/count/avg/min/max 
functions in query using `exec.hive.native-agg-function.enabled` on and off 
using the IT case below.

I verfied that the plan use Hashagg when turn on the 
`exec.hive.native-agg-function.enabled`, and the plan use SortAgg when turn off 
by IT case.

!image-2023-02-21-15-49-58-854.png|width=536,height=219!

3. For  `array` and `struct` do not support the max function.  For count 
function, it does not store `array` or `struct` in agg, so they use bigint 
instead, and hash-agg is chosen  .

!image-2023-02-21-15-59-44-470.png|width=1016,height=189!

4. For `first_value` and `last_value` are not implemented in hive,  
[https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-Built-inAggregateFunctions(UDAF)]
 I use `collect_set` to test  instead. All the plan use SortAgg, and get same 
result, which meet the expectations.

```

---  turn on `table.exec.hive.native-agg-function.enabled`

== Abstract Syntax Tree ==
LogicalProject(x=[$0], _o__c1=[$1])
+- LogicalAggregate(group=[\{0}], agg#0=[collect_set($1)])
   +- LogicalProject($f0=[$0], $f1=[$1])
      +- LogicalTableScan(table=[[test-catalog, default, foo]])

== Optimized Physical Plan ==
SortAggregate(isMerge=[true], groupBy=[x], select=[x, Final_collect_set($f1) AS 
$f1])
+- Sort(orderBy=[x ASC])
   +- Exchange(distribution=[hash[x]])
      +- LocalSortAggregate(groupBy=[x], select=[x, Partial_collect_set(y) AS 
$f1])
         +- Sort(orderBy=[x ASC])
            +- TableSourceScan(table=[[test-catalog, default, foo]], fields=[x, 
y])

== Optimized Execution Plan ==
SortAggregate(isMerge=[true], groupBy=[x], select=[x, Final_collect_set($f1) AS 
$f1])
+- Exchange(distribution=[forward])
   +- Sort(orderBy=[x ASC])
      +- Exchange(distribution=[hash[x]])
         +- LocalSortAggregate(groupBy=[x], select=[x, Partial_collect_set(y) 
AS $f1])
            +- Exchange(distribution=[forward])
               +- Sort(orderBy=[x ASC])
                  +- TableSourceScan(table=[[test-catalog, default, foo]], 
fields=[x, y])

---  turn off `table.exec.hive.native-agg-function.enabled`

== Abstract Syntax Tree ==
LogicalProject(x=[$0], _o__c1=[$1])
+- LogicalAggregate(group=[\{0}], agg#0=[collect_set($1)])
   +- LogicalProject($f0=[$0], $f1=[$1])
      +- LogicalTableScan(table=[[test-catalog, default, foo]])

== Optimized Physical Plan ==
SortAggregate(isMerge=[true], groupBy=[x], select=[x, Final_collect_set($f1) AS 
$f1])
+- Sort(orderBy=[x ASC])
   +- Exchange(distribution=[hash[x]])
      +- LocalSortAggregate(groupBy=[x], select=[x, Partial_collect_set(y) AS 
$f1])
         +- Sort(orderBy=[x ASC])
            +- TableSourceScan(table=[[test-catalog, default, foo]], fields=[x, 
y])

== Optimized Execution Plan ==
SortAggregate(isMerge=[true], groupBy=[x], select=[x, Final_collect_set($f1) AS 
$f1])
+- Exchange(distribution=[forward])
   +- Sort(orderBy=[x ASC])
      +- Exchange(distribution=[hash[x]])
         +- LocalSortAggregate(groupBy=[x], select=[x, Partial_collect_set(y) 
AS $f1])
            +- Exchange(distribution=[forward])
               +- Sort(orderBy=[x ASC])
                  +- TableSourceScan(table=[[test-catalog, default, foo]], 
fields=[x, y])

```

 

 

!image-2023-02-21-16-31-58-361.png|width=620,height=261!

 

5. I disable the hashagg to force use sortagg to process all of the test above, 
which  can see that the result of forcing to close hashagg is the same as the 
result of turn on and off`exec.hive.native-agg-function.enabled`, which meets 
the expectations

!image-2023-02-21-16-35-46-294.png|width=632,height=392!

 

Problems:

a.  The `exec.hive.native-agg-function.enabled`  do not work on sql client. the 
hashagg is not chosen on sql client.

!https://intranetproxy.alipay.com/skylark/lark/0/2023/png/83756403/1676952029939-182fa078-3a07-4e45-bdbb-832f7f74c838.png|width=703,height=383,id=u4fc84338!

b. Enable and disable `table.exec.hive.native-agg-function.enabled` get 
different result.

!image-2023-02-21-16-28-22-038.png|width=618,height=283!

!image-2023-02-21-16-29-42-983.png|width=713,height=129!
 

> Release Testing: Verify FLINK-29717 Supports hive udaf such as sum/count by 
> native implementation
> -------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-31059
>                 URL: https://issues.apache.org/jira/browse/FLINK-31059
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Connectors / Hive
>    Affects Versions: 1.17.0
>            Reporter: dalongliu
>            Assignee: miamiaoxyz
>            Priority: Blocker
>             Fix For: 1.17.0
>
>         Attachments: image-2023-02-21-15-45-48-226.png, 
> image-2023-02-21-15-46-13-966.png, image-2023-02-21-15-47-54-043.png, 
> image-2023-02-21-15-49-58-854.png, image-2023-02-21-15-59-44-470.png, 
> image-2023-02-21-16-28-22-038.png, image-2023-02-21-16-29-42-983.png, 
> image-2023-02-21-16-31-58-361.png, image-2023-02-21-16-35-46-294.png
>
>
> This task aims to verify 
> [FLINK-29717|https://issues.apache.org/jira/browse/FLINK-29717] which 
> improves the hive udaf performance.
> As the document [PR|https://github.com/apache/flink/pull/21789] description, 
> please veriy:
> 1. Enabling the option `table.exec.hive.native-agg-function.enabled`, use the 
> sum/count/avg/min/max functions separately in the query to verify if the 
> hash-agg strategy is chosen via plan, and verify if the data results are the 
> same as when the option `table.exec.hive.native-agg-function.enabled` is 
> disabled.
> 2. Enabling the option `table.exec.hive.native-agg-function.enabled`, combine 
> sum/count/avg/min/max functions in query, verify if the hash-agg strategy is 
> chosen via plan, and verify if the data results are the same as when option 
> `table.exec.hive.native-agg-function.enabled` is disabled.
> 3. Enabling the option `table.exec.hive.native-agg-function.enabled`, count 
> or max array&struct and other complex types in query, verify whether the 
> sort-agg strategy is chosen via plan, verify whether the data result is the 
> same as when option `table.exec.hive.native-agg-function.enabled` is disabled.
> 4. Enabling the option `table.exec.hive.native-agg-function.enabled`, use the 
> sum/count and first_value/last_value functions in the query simultaneously, 
> verify that the sort-agg strategy is chosen via plan, verify that the data is 
> the same as when option `table.exec.hive.native-agg-function.enabled` is 
> disabled.
> 5. Enabling the option `table.exec.hive.native-agg-function.enabled`, use the 
> sum/count/avg/min/max functions in the query and open sort-agg strategy 
> forcibly, verify that the data results are the same as when option 
> `table.exec.hive.native-agg-function.enabled` is disabled.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to