Iceberg doesn't translate Spark filters to Spark expressions. It translates Spark filters to Iceberg expressions: https://github.com/apache/incubator-iceberg/blob/master/spark/src/main/java/com/netflix/iceberg/spark/source/Reader.java#L150
So we need Spark to be smarter about the expressions it can convert to filters. On Mon, Feb 11, 2019 at 11:00 AM Xabriel Collazo Mojica <xcoll...@adobe.com> wrote: > Thanks for the interest in this issue. > > Comments in line below in between tags <xabriel> </xabriel>. > > Xabriel J Collazo Mojica | Senior Software Engineer | Adobe | > xcoll...@adobe.com > > > On 2/10/19, 9:17 AM, "Renato Marroquín Mogrovejo" < > renatoj.marroq...@gmail.com> wrote: > > Sorry to jump into the thread, but I have a quick question. > You are right that CAST("2019-01-01" AS DATE) = date and "2019-01-01" = > CAST(date AS STRING) are semantically equivalent, but the first one > implies > casting a single value whereas the second one implies casting all > possible > values that we will encounter. Is this what is intended? > > <xabriel> > Right, this is the surprising behavior that I pointed out from Spark, and > you can read more about it on an open PR from Spark that I linked to in > original email. > </xabriel> > > Or will specific > processing engines (Spark, Presto) do the right here after Iceberg > returns > back the right files? > > <xabriel> > Spark is the engine that produced the surprising phrasing of casting the > DATE to a STRING. But Spark has an interface so that, when a Data Source > can't apply the predicate that is being pushed down, we can tell the engine > to reapply it (1). But in my case, not applying that predicate is very > costly. > (1) : > https://developer.ibm.com/code/2018/04/16/introducing-apache-spark-data-sources-api-v2/ > </xabriel> > > El vie., 8 feb. 2019 a las 21:52, 'Ryan Blue' via Iceberg Developers (< > iceberg-de...@googlegroups.com>) escribió: > > > Hi Xabriel, > > Part of the problem is that the Spark community doesn’t want to pass > > Expression to data source implementations because it is internal. > From > > Spark’s perspective, this is a perfectly reasonable thing to do > because it > > has a public API for passing some expressions to the sources, the > Filter > > classes > > < > https://na01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.sql.sources.Filter&data=02%7C01%7Cxcollazo%40adobe.com%7Ccd424e4307a84bfc7d4708d68f7b9ccf%7Cfa7b1b5a7b34438794aed2c178decee1%7C0%7C0%7C636854158407499403&sdata=E9TNBE7D9kTzws8RO%2Bq9z3JzpleSiympigObfojSlcA%3D&reserved=0 > > > > . > > > > I think the right path forward is to update the conversion from > Expression > > to Filter. > > <xabriel> > Ah, I had missed that Iceberg was doing that conversion from Filter back > to Spark Expressions. Is this for historical reasons? I agree that, if > Spark Expressions are internal, it makes sense to move out. > </xabriel> > > CAST("2019-01-01" AS DATE) = date and "2019-01-01" = CAST(date > > AS STRING) are equivalent, so we should be able to convert to the > right > > filter. > > > > We have other reasons to make this conversion better as well. For > example, > > when partitioning by date_col = CAST(timestamp_col AS DATE), it is > common > > for users to want to use an equality predicate, date_col = > "2019-01-01". > > When that date_col is a hidden partition, PARTITION BY > date(timestamp_col), > > users still want to use an equality predicate instead of a timestamp > range. > > So it would be great if conversion could handle CAST(timestamp_col AS > > DATE) = "2019-01-01". > > > > I’ll open a PR on Spark to get these fixes in. > > <xabriel> > Let me make sure I follow: You intend to open a Spark Issue to get the > Filter interface to also push down CASTs, while we would open an Iceberg PR > to stop doing the Spark Filters > Spark Expressions > Iceberg Expressions > conversion and start going from Spark Filter directly to Iceberg > Expressions. Correct? > </xabriel> > > > > > rb > > > > On Thu, Feb 7, 2019 at 11:07 AM Xabriel Collazo Mojica > > <xcoll...@adobe.com.invalid> wrote: > > > >> Hello folks, > >> > >> I have been debugging this one for a bit now, so wanted to share my > >> findings, and ask for suggestions. > >> > >> I have a table with PartitionSpec like so: > >> > >> [ > >> batchId: identity(21) > >> c_date: identity(22) > >> ] > >> > >> where c_date is a DATE. > >> > >> How many files/tasks would a full table scan scan? > >> > >> import com.netflix.iceberg.TableScan > >> import scala.collection.JavaConverters._ > >> > >> val scan = iceTable.newScan() > >> var f = 0 > >> scan.planFiles.asScala.foreach( p => f = f + 1 ) > >> var t = 0 > >> scan.planTasks.asScala.foreach( p => t = t + 1 ) > >> println("files to scan: " + f) > >> println("tasks: " + t) > >> > >> files to scan: 29520 > >> tasks: 2826 > >> > >> > >> How many files would a table scan with a partition predicate scan? > >> > >> > >> import com.netflix.iceberg.TableScan > >> import scala.collection.JavaConverters._ > >> import com.netflix.iceberg.expressions._ > >> > >> val exp1 = Expressions.equal("c_date", "2017-11-15") > >> > >> val scan = iceTable.newScan().filter(exp1).filter(exp2) > >> var f = 0 > >> scan.planFiles.asScala.foreach( p => f = f + 1 ) > >> var t = 0 > >> scan.planTasks.asScala.foreach( p => t = t + 1 ) > >> println("files to scan: " + f) > >> println("tasks: " + t) > >> > >> files to scan: 164 > >> tasks: 15 > >> > >> > >> So iceberg-core and iceberg-api are doing the right thing, correctly > >> applying my predicate and thus pruning most of the work. > >> > >> > >> But what is the Spark Reader behavior? > >> > >> val datePartitionedIcebergDf = > spark.read.format("iceberg").load("...") > >> > >> > datePartitionedIcebergDf.createOrReplaceTempView("datePartitionedIcebergDf") > >> spark.sql(""" > >> SELECT … > >> WHERE … > >> AND c_date = '2017-11-15' > >> GROUP BY … > >> LIMIT 10 > >> """ > >> ).show > >> > >> > >> (1) Spark Jobs > >> Job 0 View(Stages: 3/3) > >> Stage 0: > >> 2826/2826 > >> succeeded / total tasks > >> Stage 1: > >> 200/200 > >> succeeded / total tasks > >> Stage 2: > >> 1/1 > >> succeeded / total tasks > >> > >> Inspecting the physical plan, we see that the predicate does not get > >> pushed into iceberg, and that it is phrased in a surprising way by > Spark > >> SQL: > >> > >> == Physical Plan == > >> CollectLimit 21 > >> +- *(3) Project [cast(minute1#227 as string) AS minute1#235, > >> cast(pageviews#228 as string) AS pageviews#236] > >> +- *(3) GlobalLimit 10 > >> +- Exchange SinglePartition > >> +- *(2) LocalLimit 10 > >> +- *(2) HashAggregate(keys=[minute(timestamp#88, > >> Some(Etc/UTC))#243], functions=[finalmerge_sum(merge sum#242) AS > >> sum(value#239)#230], output=[minute1#227, pageviews#228]) > >> +- Exchange hashpartitioning(minute(timestamp#88, > >> Some(Etc/UTC))#243, 200) > >> +- *(1) HashAggregate(keys=[minute(timestamp#88, > >> Some(Etc/UTC)) AS minute(timestamp#88, Some(Etc/UTC))#243], > >> functions=[partial_sum(value#239) AS sum#242], > output=[minute(timestamp#88, > >> Some(Etc/UTC))#243, sum#242]) > >> +- *(1) Project [timestamp#88, > >> web#97.webPageDetails.pageViews.value AS value#239] > >> +- *(1) Filter ((cast(c_date#107 as string) > = > >> 2017-11-15) && isnotnull(c_date#107)) > >> +- *(1) ScanV2 iceberg[timestamp#88, > web#97, > >> c_date#107] (Filters: [isnotnull(c_date#107)], Options: > >> [path=adl://…/iceberg/,paths=[]]) > >> > >> If we force the literal to be a DATE, then we get the right > partition > >> pruning behavior: > >> > >> > >> > datePartitionedIcebergDf.createOrReplaceTempView("datePartitionedIcebergDf") > >> spark.sql(""" > >> SELECT … > >> FROM datePartitionedIcebergDf > >> WHERE c_date = CAST('2017-11-15' AS DATE) > >> GROUP BY minute1 > >> LIMIT 10 > >> """ > >> ).show > >> > >> (1) Spark Jobs > >> Job 5 View(Stages: 3/3) > >> Stage 15: > >> 15/15 > >> succeeded / total tasks > >> Stage 16: > >> 200/200 > >> succeeded / total tasks > >> Stage 17: > >> 1/1 > >> succeeded / total tasks > >> > >> > >> And inspecting the physical plan we confirm correct behavior: > >> > >> == Physical Plan == > >> CollectLimit 21 > >> +- *(3) Project [cast(minute1#250 as string) AS minute1#258, > >> cast(pageviews#251 as string) AS pageviews#259] > >> +- *(3) GlobalLimit 10 > >> +- Exchange SinglePartition > >> +- *(2) LocalLimit 10 > >> +- *(2) HashAggregate(keys=[minute(timestamp#88, > >> Some(Etc/UTC))#266], functions=[finalmerge_sum(merge sum#265) AS > >> sum(value#262)#253], output=[minute1#250, pageviews#251]) > >> +- Exchange hashpartitioning(minute(timestamp#88, > >> Some(Etc/UTC))#266, 200) > >> +- *(1) HashAggregate(keys=[minute(timestamp#88, > >> Some(Etc/UTC)) AS minute(timestamp#88, Some(Etc/UTC))#266], > >> functions=[partial_sum(value#262) AS sum#265], > output=[minute(timestamp#88, > >> Some(Etc/UTC))#266, sum#265]) > >> +- *(1) Project [timestamp#88, > >> web#97.webPageDetails.pageViews.value AS value#262] > >> +- *(1) Filter ((c_date#107 = 17485) && > >> isnotnull(c_date#107)) > >> +- *(1) ScanV2 iceberg[timestamp#88, > web#97, > >> c_date#107] (Filters: [(c_date#107 = 17485), isnotnull(c_date#107)], > >> Options: [path=adl://…/iceberg/,paths=[]]) > >> > >> > >> Discussion: > >> > >> The original Spark filter is of the form: > >> > >> CAST( date AS STRING) OP STRING_LITERAL > >> > >> Which Iceberg, today, doesn’t understand since it does not support > CAST > >> expressions. Thus a full table scan is done for my original query. > Problem > >> is, it’s very common for folks to use literals to express DATEs. At > first, > >> I thought, why on earth would Spark phrase the predicate like that, > since > >> its way cheaper to evaluate this: > >> > >> c_date = CAST(‘2017-11-15’ AS DATE) > >> > >> But it turns out that that phrasing would break pre-establish > semantics: > >> > https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fspark%2Fpull%2F17174%23discussion_r104401305&data=02%7C01%7Cxcollazo%40adobe.com%7Ccd424e4307a84bfc7d4708d68f7b9ccf%7Cfa7b1b5a7b34438794aed2c178decee1%7C0%7C0%7C636854158407499403&sdata=f7sh4x3%2F%2FvyIf5m91cUF%2BjxzssIRVR9Nb2Ktv6IfOHY%3D&reserved=0 > >> > >> > >> So we have two options I think: > >> > >> 1) Iceberg, upon seeing a CAST( date AS STRING) OP STRING_LITERAL > can try > >> to rephrase and resolve it on SparkExpressions. As per discussion > in Spark > >> thread above, I think it’s a safe conversion if we are strict on > matching > >> the predicate to be a comparison between a DATE and a literal. > >> > >> or > >> > >> 2) Iceberg should start supporting CAST predicates. > >> > >> > >> (1) is way cheaper and less invasive, but I wonder if we will see > other > >> instances of similar behavior, or if the incoming predicate is just > a bit > >> more complicated and we would miss matching it with approach of > (1), and > >> thus should just bite the bullet and go for (2). > >> > >> > >> Comments/Suggestions ? > >> > >> > >> Xabriel J Collazo Mojica | Senior Software Engineer | Adobe | > >> xcoll...@adobe.com > >> > > > > > > -- > > Ryan Blue > > Software Engineer > > Netflix > > > > -- > > You received this message because you are subscribed to the Google > Groups > > "Iceberg Developers" group. > > To unsubscribe from this group and stop receiving emails from it, > send an > > email to iceberg-devel+unsubscr...@googlegroups.com. > > To post to this group, send email to iceberg-de...@googlegroups.com. > > To view this discussion on the web visit > > > https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgroups.google.com%2Fd%2Fmsgid%2Ficeberg-devel%2FCAO4re1kd%253DSXapjL6kaBZ7nu1p%252BFuTOYiKY4iTKbAtjx3%252Bjgx5A%2540mail.gmail.com&data=02%7C01%7Cxcollazo%40adobe.com%7Ccd424e4307a84bfc7d4708d68f7b9ccf%7Cfa7b1b5a7b34438794aed2c178decee1%7C0%7C0%7C636854158407499403&sdata=JRiWLcQAl8qr0qOspzbGxLY9rILTrBiSR8XTnAW8hbU%3D&reserved=0 > > < > https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgroups.google.com%2Fd%2Fmsgid%2Ficeberg-devel%2FCAO4re1kd%253DSXapjL6kaBZ7nu1p%252BFuTOYiKY4iTKbAtjx3%252Bjgx5A%2540mail.gmail.com%3Futm_medium%3Demail%26utm_source%3Dfooter&data=02%7C01%7Cxcollazo%40adobe.com%7Ccd424e4307a84bfc7d4708d68f7b9ccf%7Cfa7b1b5a7b34438794aed2c178decee1%7C0%7C0%7C636854158407499403&sdata=ibAcDlaOlRq6G7UxT1lLauUAdnvBsnqFdQ1kPFyfsfo%3D&reserved=0 > > > > . > > For more options, visit > https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgroups.google.com%2Fd%2Foptout&data=02%7C01%7Cxcollazo%40adobe.com%7Ccd424e4307a84bfc7d4708d68f7b9ccf%7Cfa7b1b5a7b34438794aed2c178decee1%7C0%7C0%7C636854158407499403&sdata=T544f3rMYb7w6J40E%2FqiNcnay%2FvGdDLsfxotk6z%2FiHc%3D&reserved=0 > . > > > > > -- Ryan Blue Software Engineer Netflix