Davis Zhang created HUDI-9420:
---------------------------------
Summary: Spark sql select some column is significantly slower than
some others
Key: HUDI-9420
URL: https://issues.apache.org/jira/browse/HUDI-9420
Project: Apache Hudi
Issue Type: Bug
Reporter: Davis Zhang
{code:java}
scala>
scala> spark.sql("set
spark.sql.optimizer.dynamicPruningOnIndexedCol.enabled=false")
res310: org.apache.spark.sql.DataFrame = [key: string, value: string]
scala> val startTime = System.currentTimeMillis()
startTime: Long = 1747436173549
scala>
scala> spark.sql("""
| SELECT h.secKey
| FROM hudi h
| JOIN parquet_table p
| ON h.key = p.key
| and p.partition="2025-01-21"
| limit 10
| """).collect()
res311: Array[org.apache.spark.sql.Row] =
Array([000-108b5602-3e05-45f6-95ae-eb8bbc714b80],
[000-74d77871-1295-4763-90b1-101b4d5edea2],
[000-a7ad8185-5487-4376-b132-80de4637204d],
[000-6c787dc5-a040-449c-9ffc-4835d0cea95a],
[000-901fc912-ce18-41d2-b868-550e196e5608],
[000-6931bbb2-7cea-42bb-be43-863570324bec],
[000-c43ecf5c-d9d1-47d8-8543-aeded4b23866],
[000-032b3c28-49b1-4bd4-bdd3-f665686728bd],
[000-30fddb94-02a5-4727-bbd7-cccf72c6023e],
[000-2483dab1-3ea8-4b33-8607-079f94d1fe32])
scala>
scala> // Calculate and print duration
scala> val endTime = System.currentTimeMillis()
endTime: Long = 1747436281181
scala> val durationInSeconds = (endTime - startTime) / 1000.0
durationInSeconds: Double = 107.632
scala> println(s"Query execution time: $durationInSeconds seconds")
Query execution time: 107.632 seconds
scala>
scala>
scala> spark.sql("set
spark.sql.optimizer.dynamicPruningOnIndexedCol.enabled=false")
res313: org.apache.spark.sql.DataFrame = [key: string, value: string]
scala> val startTime = System.currentTimeMillis()
startTime: Long = 1747436281326
scala>
scala> spark.sql("""
| SELECT h.textField1
| FROM hudi h
| JOIN parquet_table p
| ON h.key = p.key
| and p.partition="2025-01-21"
| limit 10
| """).collect()
res314: Array[org.apache.spark.sql.Row] =
Array([abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz],
[abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz],
[abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz],
[abcdefghij...
scala>
scala> // Calculate and print duration
scala> val endTime = System.currentTimeMillis()
endTime: Long = 1747436327443
scala> val durationInSeconds = (endTime - startTime) / 1000.0
durationInSeconds: Double = 46.117
scala> println(s"Query execution time: $durationInSeconds seconds")
Query execution time: 46.117 seconds
{code}
Table schema
{code:java}
{ "name" : "key", ←------- Record key with RLI enabled (OH hudi case)
"type" : [ "null", "string" ], "default" : null }, { "name" : "secKey",
←---- column with secondary index "type" : [ "null", "string" ], "default"
: null }, { "name" : "date", "type" : [ "null", "string" ], "default"
: null }, { "name" : "intField", "type" : [ "null", "long" ],
"default" : null }, { "name" : "city", "type" : [ "null", "string" ],
"default" : null }, { "name" : "textField1", ←----- 256 KB before
compression "type" : [ "null", "string" ], "default" : null }, {
"name" : "textField2",←----- 256 KB before compression "type" : [ "null",
"string" ], "default" : null }, { "name" : "textField3",←----- 512 KB
before compression "type" : [ "null", "string" ], "default" : null }, {
"name" : "textField4",←----- 1024 KB/1 MB before compression "type" : [
"null", "string" ], "default" : null }, { "name" : "decimalField",
"type" : [ "null", "float" ], "default" : null }, { "name" :
"longField", "type" : [ "null", "long" ], "default" : null }, {
"name" : "partition", ←---------- used as partition column "type" : [
"null", "int" ], "default" : null }, { "name" : "incrLongField",
"type" : [ "null", "long" ], "default" : null } ]} {code}
* COW table created with hudi 1.x
* *10 partitions, per partition 10GB* worth of data, *total size is 100GB*
* The fan out from secIdx -> record_key would be 1 as they are independently
generated UUID.
* Per record size before compression is around 2 MB.
* Per index file group size on disk is fined tuned to be 55-60 MB.
* Per data block size on disk is 128 MB.
Spark 3.5.2. Hudi 1.0.2
--
This message was sent by Atlassian Jira
(v8.20.10#820010)