Davis Zhang created HUDI-9420:
---------------------------------

             Summary: Spark sql select some column is significantly slower than 
some others
                 Key: HUDI-9420
                 URL: https://issues.apache.org/jira/browse/HUDI-9420
             Project: Apache Hudi
          Issue Type: Bug
            Reporter: Davis Zhang


{code:java}
scala> 


scala> spark.sql("set 
spark.sql.optimizer.dynamicPruningOnIndexedCol.enabled=false")
res310: org.apache.spark.sql.DataFrame = [key: string, value: string]


scala> val startTime = System.currentTimeMillis()
startTime: Long = 1747436173549


scala> 


scala> spark.sql("""
     |   SELECT h.secKey
     |   FROM hudi h
     |   JOIN parquet_table p
     |   ON h.key = p.key
     |   and p.partition="2025-01-21"
     |   limit 10
     | """).collect()
res311: Array[org.apache.spark.sql.Row] = 
Array([000-108b5602-3e05-45f6-95ae-eb8bbc714b80], 
[000-74d77871-1295-4763-90b1-101b4d5edea2], 
[000-a7ad8185-5487-4376-b132-80de4637204d], 
[000-6c787dc5-a040-449c-9ffc-4835d0cea95a], 
[000-901fc912-ce18-41d2-b868-550e196e5608], 
[000-6931bbb2-7cea-42bb-be43-863570324bec], 
[000-c43ecf5c-d9d1-47d8-8543-aeded4b23866], 
[000-032b3c28-49b1-4bd4-bdd3-f665686728bd], 
[000-30fddb94-02a5-4727-bbd7-cccf72c6023e], 
[000-2483dab1-3ea8-4b33-8607-079f94d1fe32])


scala> 


scala> // Calculate and print duration


scala> val endTime = System.currentTimeMillis()
endTime: Long = 1747436281181


scala> val durationInSeconds = (endTime - startTime) / 1000.0
durationInSeconds: Double = 107.632


scala> println(s"Query execution time: $durationInSeconds seconds")
Query execution time: 107.632 seconds


scala> 


scala> 


scala> spark.sql("set 
spark.sql.optimizer.dynamicPruningOnIndexedCol.enabled=false")
res313: org.apache.spark.sql.DataFrame = [key: string, value: string]


scala> val startTime = System.currentTimeMillis()
startTime: Long = 1747436281326


scala> 


scala> spark.sql("""
     |   SELECT h.textField1
     |   FROM hudi h
     |   JOIN parquet_table p
     |   ON h.key = p.key
     |   and p.partition="2025-01-21"
     |   limit 10
     | """).collect()
res314: Array[org.apache.spark.sql.Row] = 
Array([abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz],
 
[abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz],
 
[abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz|abcdefghijklmnopqrstuvwxyz],
 [abcdefghij...


scala> 


scala> // Calculate and print duration


scala> val endTime = System.currentTimeMillis()
endTime: Long = 1747436327443


scala> val durationInSeconds = (endTime - startTime) / 1000.0
durationInSeconds: Double = 46.117


scala> println(s"Query execution time: $durationInSeconds seconds")
Query execution time: 46.117 seconds

 {code}
Table schema
{code:java}
{    "name" : "key", ←------- Record key with RLI enabled (OH hudi case)    
"type" : [ "null", "string" ],    "default" : null  }, {    "name" : "secKey", 
←---- column with secondary index   "type" : [ "null", "string" ],    "default" 
: null  }, {    "name" : "date",    "type" : [ "null", "string" ],    "default" 
: null  }, {    "name" : "intField",    "type" : [ "null", "long" ],    
"default" : null  }, {    "name" : "city",    "type" : [ "null", "string" ],    
"default" : null  }, {    "name" : "textField1", ←----- 256 KB before 
compression    "type" : [ "null", "string" ],    "default" : null  }, {    
"name" : "textField2",←----- 256 KB before compression    "type" : [ "null", 
"string" ],    "default" : null  }, {    "name" : "textField3",←----- 512 KB 
before compression    "type" : [ "null", "string" ],    "default" : null  }, {  
  "name" : "textField4",←----- 1024 KB/1 MB before compression    "type" : [ 
"null", "string" ],    "default" : null  }, {    "name" : "decimalField",    
"type" : [ "null", "float" ],    "default" : null  }, {    "name" : 
"longField",    "type" : [ "null", "long" ],    "default" : null  }, {    
"name" : "partition", ←---------- used as partition column    "type" : [ 
"null", "int" ],    "default" : null  }, {    "name" : "incrLongField",    
"type" : [ "null", "long" ],    "default" : null  } ]} {code}
 * COW table created with hudi 1.x
 * *10 partitions, per partition 10GB* worth of data, *total size is 100GB*
 * The fan out from secIdx -> record_key would be 1 as they are independently 
generated UUID.
 * Per record size before compression is around 2 MB.
 * Per index file group size on disk is fined tuned to be 55-60 MB.
 * Per data block size on disk is 128 MB.

 

Spark 3.5.2. Hudi 1.0.2



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to