Hello folks, I have been debugging this one for a bit now, so wanted to share my findings, and ask for suggestions.
I have a table with PartitionSpec like so: [ batchId: identity(21) c_date: identity(22) ] where c_date is a DATE. How many files/tasks would a full table scan scan? import com.netflix.iceberg.TableScan import scala.collection.JavaConverters._ val scan = iceTable.newScan() var f = 0 scan.planFiles.asScala.foreach( p => f = f + 1 ) var t = 0 scan.planTasks.asScala.foreach( p => t = t + 1 ) println("files to scan: " + f) println("tasks: " + t) files to scan: 29520 tasks: 2826 How many files would a table scan with a partition predicate scan? import com.netflix.iceberg.TableScan import scala.collection.JavaConverters._ import com.netflix.iceberg.expressions._ val exp1 = Expressions.equal("c_date", "2017-11-15") val scan = iceTable.newScan().filter(exp1).filter(exp2) var f = 0 scan.planFiles.asScala.foreach( p => f = f + 1 ) var t = 0 scan.planTasks.asScala.foreach( p => t = t + 1 ) println("files to scan: " + f) println("tasks: " + t) files to scan: 164 tasks: 15 So iceberg-core and iceberg-api are doing the right thing, correctly applying my predicate and thus pruning most of the work. But what is the Spark Reader behavior? val datePartitionedIcebergDf = spark.read.format("iceberg").load("...") datePartitionedIcebergDf.createOrReplaceTempView("datePartitionedIcebergDf") spark.sql(""" SELECT … WHERE … AND c_date = '2017-11-15' GROUP BY … LIMIT 10 """ ).show (1) Spark Jobs Job 0 View(Stages: 3/3) Stage 0: 2826/2826 succeeded / total tasks Stage 1: 200/200 succeeded / total tasks Stage 2: 1/1 succeeded / total tasks Inspecting the physical plan, we see that the predicate does not get pushed into iceberg, and that it is phrased in a surprising way by Spark SQL: == Physical Plan == CollectLimit 21 +- *(3) Project [cast(minute1#227 as string) AS minute1#235, cast(pageviews#228 as string) AS pageviews#236] +- *(3) GlobalLimit 10 +- Exchange SinglePartition +- *(2) LocalLimit 10 +- *(2) HashAggregate(keys=[minute(timestamp#88, Some(Etc/UTC))#243], functions=[finalmerge_sum(merge sum#242) AS sum(value#239)#230], output=[minute1#227, pageviews#228]) +- Exchange hashpartitioning(minute(timestamp#88, Some(Etc/UTC))#243, 200) +- *(1) HashAggregate(keys=[minute(timestamp#88, Some(Etc/UTC)) AS minute(timestamp#88, Some(Etc/UTC))#243], functions=[partial_sum(value#239) AS sum#242], output=[minute(timestamp#88, Some(Etc/UTC))#243, sum#242]) +- *(1) Project [timestamp#88, web#97.webPageDetails.pageViews.value AS value#239] +- *(1) Filter ((cast(c_date#107 as string) = 2017-11-15) && isnotnull(c_date#107)) +- *(1) ScanV2 iceberg[timestamp#88, web#97, c_date#107] (Filters: [isnotnull(c_date#107)], Options: [path=adl://…/iceberg/,paths=[]]) If we force the literal to be a DATE, then we get the right partition pruning behavior: datePartitionedIcebergDf.createOrReplaceTempView("datePartitionedIcebergDf") spark.sql(""" SELECT … FROM datePartitionedIcebergDf WHERE c_date = CAST('2017-11-15' AS DATE) GROUP BY minute1 LIMIT 10 """ ).show (1) Spark Jobs Job 5 View(Stages: 3/3) Stage 15: 15/15 succeeded / total tasks Stage 16: 200/200 succeeded / total tasks Stage 17: 1/1 succeeded / total tasks And inspecting the physical plan we confirm correct behavior: == Physical Plan == CollectLimit 21 +- *(3) Project [cast(minute1#250 as string) AS minute1#258, cast(pageviews#251 as string) AS pageviews#259] +- *(3) GlobalLimit 10 +- Exchange SinglePartition +- *(2) LocalLimit 10 +- *(2) HashAggregate(keys=[minute(timestamp#88, Some(Etc/UTC))#266], functions=[finalmerge_sum(merge sum#265) AS sum(value#262)#253], output=[minute1#250, pageviews#251]) +- Exchange hashpartitioning(minute(timestamp#88, Some(Etc/UTC))#266, 200) +- *(1) HashAggregate(keys=[minute(timestamp#88, Some(Etc/UTC)) AS minute(timestamp#88, Some(Etc/UTC))#266], functions=[partial_sum(value#262) AS sum#265], output=[minute(timestamp#88, Some(Etc/UTC))#266, sum#265]) +- *(1) Project [timestamp#88, web#97.webPageDetails.pageViews.value AS value#262] +- *(1) Filter ((c_date#107 = 17485) && isnotnull(c_date#107)) +- *(1) ScanV2 iceberg[timestamp#88, web#97, c_date#107] (Filters: [(c_date#107 = 17485), isnotnull(c_date#107)], Options: [path=adl://…/iceberg/,paths=[]]) Discussion: The original Spark filter is of the form: CAST( date AS STRING) OP STRING_LITERAL Which Iceberg, today, doesn’t understand since it does not support CAST expressions. Thus a full table scan is done for my original query. Problem is, it’s very common for folks to use literals to express DATEs. At first, I thought, why on earth would Spark phrase the predicate like that, since its way cheaper to evaluate this: c_date = CAST(‘2017-11-15’ AS DATE) But it turns out that that phrasing would break pre-establish semantics: https://github.com/apache/spark/pull/17174#discussion_r104401305 So we have two options I think: 1) Iceberg, upon seeing a CAST( date AS STRING) OP STRING_LITERAL can try to rephrase and resolve it on SparkExpressions. As per discussion in Spark thread above, I think it’s a safe conversion if we are strict on matching the predicate to be a comparison between a DATE and a literal. or 2) Iceberg should start supporting CAST predicates. (1) is way cheaper and less invasive, but I wonder if we will see other instances of similar behavior, or if the incoming predicate is just a bit more complicated and we would miss matching it with approach of (1), and thus should just bite the bullet and go for (2). Comments/Suggestions ? Xabriel J Collazo Mojica | Senior Software Engineer | Adobe | xcoll...@adobe.com