Hello folks,

I have been debugging this one for a bit now, so wanted to share my findings, 
and ask for suggestions.

I have a table with PartitionSpec like so:

[
  batchId: identity(21)
  c_date: identity(22)
]

where c_date is a DATE.

How many files/tasks would a full table scan scan?

import com.netflix.iceberg.TableScan
import scala.collection.JavaConverters._

val scan = iceTable.newScan()
var f = 0
scan.planFiles.asScala.foreach( p => f = f + 1 )
var t = 0
scan.planTasks.asScala.foreach( p => t = t + 1 )
println("files to scan: " + f)
println("tasks: " + t)

files to scan: 29520
tasks: 2826


How many files would a table scan with a partition predicate scan?


import com.netflix.iceberg.TableScan
import scala.collection.JavaConverters._
import com.netflix.iceberg.expressions._

val exp1 = Expressions.equal("c_date", "2017-11-15")

val scan = iceTable.newScan().filter(exp1).filter(exp2)
var f = 0
scan.planFiles.asScala.foreach( p => f = f + 1 )
var t = 0
scan.planTasks.asScala.foreach( p => t = t + 1 )
println("files to scan: " + f)
println("tasks: " + t)

files to scan: 164
tasks: 15


So iceberg-core and iceberg-api are doing the right thing, correctly applying 
my predicate and thus pruning most of the work.


But what is the Spark Reader behavior?

val datePartitionedIcebergDf = spark.read.format("iceberg").load("...")
datePartitionedIcebergDf.createOrReplaceTempView("datePartitionedIcebergDf")
spark.sql("""
SELECT …
WHERE …
  AND  c_date = '2017-11-15'
GROUP BY …
LIMIT 10
"""
).show


(1) Spark Jobs
Job 0 View(Stages: 3/3)
Stage 0:
2826/2826
succeeded / total tasks
Stage 1:
200/200
succeeded / total tasks
Stage 2:
1/1
succeeded / total tasks

Inspecting the physical plan, we see that the predicate does not get pushed 
into iceberg, and that it is phrased in a surprising way by Spark SQL:

== Physical Plan ==
CollectLimit 21
+- *(3) Project [cast(minute1#227 as string) AS minute1#235, cast(pageviews#228 
as string) AS pageviews#236]
   +- *(3) GlobalLimit 10
      +- Exchange SinglePartition
         +- *(2) LocalLimit 10
            +- *(2) HashAggregate(keys=[minute(timestamp#88, 
Some(Etc/UTC))#243], functions=[finalmerge_sum(merge sum#242) AS 
sum(value#239)#230], output=[minute1#227, pageviews#228])
               +- Exchange hashpartitioning(minute(timestamp#88, 
Some(Etc/UTC))#243, 200)
                  +- *(1) HashAggregate(keys=[minute(timestamp#88, 
Some(Etc/UTC)) AS minute(timestamp#88, Some(Etc/UTC))#243], 
functions=[partial_sum(value#239) AS sum#242], output=[minute(timestamp#88, 
Some(Etc/UTC))#243, sum#242])
                     +- *(1) Project [timestamp#88, 
web#97.webPageDetails.pageViews.value AS value#239]
                        +- *(1) Filter ((cast(c_date#107 as string) = 
2017-11-15) && isnotnull(c_date#107))
                           +- *(1) ScanV2 iceberg[timestamp#88, web#97, 
c_date#107] (Filters: [isnotnull(c_date#107)], Options: 
[path=adl://…/iceberg/,paths=[]])

If we force the literal to be a DATE, then we get the right partition pruning 
behavior:

datePartitionedIcebergDf.createOrReplaceTempView("datePartitionedIcebergDf")
spark.sql("""
SELECT …
FROM datePartitionedIcebergDf
WHERE c_date = CAST('2017-11-15' AS DATE)
GROUP BY minute1
LIMIT 10
"""
).show

(1) Spark Jobs
Job 5 View(Stages: 3/3)
Stage 15:
15/15
succeeded / total tasks
Stage 16:
200/200
succeeded / total tasks
Stage 17:
1/1
succeeded / total tasks


And inspecting the physical plan we confirm correct behavior:

== Physical Plan ==
CollectLimit 21
+- *(3) Project [cast(minute1#250 as string) AS minute1#258, cast(pageviews#251 
as string) AS pageviews#259]
   +- *(3) GlobalLimit 10
      +- Exchange SinglePartition
         +- *(2) LocalLimit 10
            +- *(2) HashAggregate(keys=[minute(timestamp#88, 
Some(Etc/UTC))#266], functions=[finalmerge_sum(merge sum#265) AS 
sum(value#262)#253], output=[minute1#250, pageviews#251])
               +- Exchange hashpartitioning(minute(timestamp#88, 
Some(Etc/UTC))#266, 200)
                  +- *(1) HashAggregate(keys=[minute(timestamp#88, 
Some(Etc/UTC)) AS minute(timestamp#88, Some(Etc/UTC))#266], 
functions=[partial_sum(value#262) AS sum#265], output=[minute(timestamp#88, 
Some(Etc/UTC))#266, sum#265])
                     +- *(1) Project [timestamp#88, 
web#97.webPageDetails.pageViews.value AS value#262]
                        +- *(1) Filter ((c_date#107 = 17485) && 
isnotnull(c_date#107))
                           +- *(1) ScanV2 iceberg[timestamp#88, web#97, 
c_date#107] (Filters: [(c_date#107 = 17485), isnotnull(c_date#107)], Options: 
[path=adl://…/iceberg/,paths=[]])


Discussion:

The original Spark filter is of the form:

CAST( date AS STRING) OP STRING_LITERAL

Which Iceberg, today, doesn’t understand since it does not support CAST 
expressions. Thus a full table scan is done for my original query. Problem is, 
it’s very common for folks to use literals to express DATEs. At first, I 
thought, why on earth would Spark phrase the predicate like that, since its way 
cheaper to evaluate this:

c_date = CAST(‘2017-11-15’ AS DATE)

But it turns out that that phrasing would break pre-establish semantics: 
https://github.com/apache/spark/pull/17174#discussion_r104401305


So we have two options I think:

1) Iceberg, upon seeing a CAST( date AS STRING) OP STRING_LITERAL can try to 
rephrase and resolve it on SparkExpressions. As per discussion in Spark thread 
above, I think it’s a safe conversion if we are strict on matching the 
predicate to be a comparison between a DATE and a literal.

or

2) Iceberg should start supporting CAST predicates.


(1) is way cheaper and less invasive, but I wonder if we will see other 
instances of similar behavior, or if the incoming predicate is just a bit more 
complicated and we would miss matching it with approach of (1), and thus should 
just bite the bullet and go for (2).


Comments/Suggestions ?


Xabriel J Collazo Mojica  |  Senior Software Engineer  |  Adobe  |  
xcoll...@adobe.com

Reply via email to