Got it. Thanks for the clarification Ryan.

I think I got confused by this convert() method: 
https://github.com/apache/incubator-iceberg/blob/master/spark/src/main/java/com/netflix/iceberg/spark/SparkExpressions.java#L106

Xabriel J Collazo Mojica  |  Senior Software Engineer  |  Adobe  |  
xcoll...@adobe.com

From: Ryan Blue <rb...@netflix.com>
Reply-To: "rb...@netflix.com" <rb...@netflix.com>
Date: Tuesday, February 12, 2019 at 6:23 PM
To: Xabriel Collazo Mojica <xcoll...@adobe.com>
Cc: "dev@iceberg.apache.org" <dev@iceberg.apache.org>, 
"iceberg-de...@googlegroups.com" <iceberg-de...@googlegroups.com>
Subject: Re: Predicates against DATEs with String Literals coming from Spark 
are ignored by Iceberg

Iceberg doesn't translate Spark filters to Spark expressions. It translates 
Spark filters to Iceberg expressions: 
https://github.com/apache/incubator-iceberg/blob/master/spark/src/main/java/com/netflix/iceberg/spark/source/Reader.java#L150<https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fincubator-iceberg%2Fblob%2Fmaster%2Fspark%2Fsrc%2Fmain%2Fjava%2Fcom%2Fnetflix%2Ficeberg%2Fspark%2Fsource%2FReader.java%23L150&data=02%7C01%7Cxcollazo%40adobe.com%7Ce2e4b31270554a81a1be08d6915a4959%7Cfa7b1b5a7b34438794aed2c178decee1%7C0%7C0%7C636856214314600534&sdata=N7T7a7DbBSya4IsaWDwsWuc8cKSbiBwFw6gwBawiUkU%3D&reserved=0>

So we need Spark to be smarter about the expressions it can convert to filters.

On Mon, Feb 11, 2019 at 11:00 AM Xabriel Collazo Mojica 
<xcoll...@adobe.com<mailto:xcoll...@adobe.com>> wrote:
Thanks for the interest in this issue.

Comments in line below in between tags <xabriel> </xabriel>.

Xabriel J Collazo Mojica  |  Senior Software Engineer  |  Adobe  |  
xcoll...@adobe.com<mailto:xcoll...@adobe.com>


On 2/10/19, 9:17 AM, "Renato Marroquín Mogrovejo" 
<renatoj.marroq...@gmail.com<mailto:renatoj.marroq...@gmail.com>> wrote:

    Sorry to jump into the thread, but I have a quick question.
    You are right that CAST("2019-01-01" AS DATE) = date and "2019-01-01" =
    CAST(date AS STRING) are semantically equivalent, but the first one implies
    casting a single value whereas the second one implies casting all possible
    values that we will encounter. Is this what is intended?

<xabriel>
Right, this is the surprising behavior that I pointed out from Spark, and you 
can read more about it on an open PR from Spark that I linked to in original 
email.
</xabriel>

Or will specific
    processing engines (Spark, Presto) do the right here after Iceberg returns
    back the right files?

<xabriel>
Spark is the engine that produced the surprising phrasing of casting the DATE 
to a STRING. But Spark has an interface so that, when a Data Source can't apply 
the predicate that is being pushed down, we can tell the engine to reapply it 
(1). But in my case, not applying that predicate is very costly.
(1) : 
https://developer.ibm.com/code/2018/04/16/introducing-apache-spark-data-sources-api-v2/<https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fdeveloper.ibm.com%2Fcode%2F2018%2F04%2F16%2Fintroducing-apache-spark-data-sources-api-v2%2F&data=02%7C01%7Cxcollazo%40adobe.com%7Ce2e4b31270554a81a1be08d6915a4959%7Cfa7b1b5a7b34438794aed2c178decee1%7C0%7C0%7C636856214314600534&sdata=4u70kEcHEa2Bl5l1E%2F7dlkczPqcBXP6W1%2BY3X0ClptU%3D&reserved=0>
</xabriel>

    El vie., 8 feb. 2019 a las 21:52, 'Ryan Blue' via Iceberg Developers (<
    iceberg-de...@googlegroups.com<mailto:iceberg-de...@googlegroups.com>>) 
escribió:

    > Hi Xabriel,
    > Part of the problem is that the Spark community doesn’t want to pass
    > Expression to data source implementations because it is internal. From
    > Spark’s perspective, this is a perfectly reasonable thing to do because it
    > has a public API for passing some expressions to the sources, the Filter
    > classes
    > 
<https://na01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.sql.sources.Filter&amp;data=02%7C01%7Cxcollazo%40adobe.com%7Ccd424e4307a84bfc7d4708d68f7b9ccf%7Cfa7b1b5a7b34438794aed2c178decee1%7C0%7C0%7C636854158407499403&amp;sdata=E9TNBE7D9kTzws8RO%2Bq9z3JzpleSiympigObfojSlcA%3D&amp;reserved=0<https://na01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.sql.sources.Filter&data=02%7C01%7Cxcollazo%40adobe.com%7Ce2e4b31270554a81a1be08d6915a4959%7Cfa7b1b5a7b34438794aed2c178decee1%7C0%7C0%7C636856214314610542&sdata=3nrLqkH1YSl5QtWSXuHsVUHUKk5eJj2BeNIVtqKQByc%3D&reserved=0>>
    > .
    >
    > I think the right path forward is to update the conversion from Expression
    > to Filter.

<xabriel>
Ah, I had missed that Iceberg was doing that conversion from Filter back to 
Spark Expressions. Is this for historical reasons? I agree that, if Spark 
Expressions are internal, it makes sense to move out.
</xabriel>

CAST("2019-01-01" AS DATE) = date and "2019-01-01" = CAST(date
    > AS STRING) are equivalent, so we should be able to convert to the right
    > filter.
    >
    > We have other reasons to make this conversion better as well. For example,
    > when partitioning by date_col = CAST(timestamp_col AS DATE), it is common
    > for users to want to use an equality predicate, date_col = "2019-01-01".
    > When that date_col is a hidden partition, PARTITION BY 
date(timestamp_col),
    > users still want to use an equality predicate instead of a timestamp 
range.
    > So it would be great if conversion could handle CAST(timestamp_col AS
    > DATE) = "2019-01-01".
    >
    > I’ll open a PR on Spark to get these fixes in.

<xabriel>
Let me make sure I follow: You intend to open a Spark Issue to get the Filter 
interface to also push down CASTs, while we would open an Iceberg PR to stop 
doing the Spark Filters > Spark Expressions > Iceberg Expressions conversion 
and start going from Spark Filter directly to Iceberg Expressions. Correct?
</xabriel>

    >
    > rb
    >
    > On Thu, Feb 7, 2019 at 11:07 AM Xabriel Collazo Mojica
    > <xcoll...@adobe.com.invalid> wrote:
    >
    >> Hello folks,
    >>
    >> I have been debugging this one for a bit now, so wanted to share my
    >> findings, and ask for suggestions.
    >>
    >> I have a table with PartitionSpec like so:
    >>
    >> [
    >>   batchId: identity(21)
    >>   c_date: identity(22)
    >> ]
    >>
    >> where c_date is a DATE.
    >>
    >> How many files/tasks would a full table scan scan?
    >>
    >> import com.netflix.iceberg.TableScan
    >> import scala.collection.JavaConverters._
    >>
    >> val scan = iceTable.newScan()
    >> var f = 0
    >> scan.planFiles.asScala.foreach( p => f = f + 1 )
    >> var t = 0
    >> scan.planTasks.asScala.foreach( p => t = t + 1 )
    >> println("files to scan: " + f)
    >> println("tasks: " + t)
    >>
    >> files to scan: 29520
    >> tasks: 2826
    >>
    >>
    >> How many files would a table scan with a partition predicate scan?
    >>
    >>
    >> import com.netflix.iceberg.TableScan
    >> import scala.collection.JavaConverters._
    >> import com.netflix.iceberg.expressions._
    >>
    >> val exp1 = Expressions.equal("c_date", "2017-11-15")
    >>
    >> val scan = iceTable.newScan().filter(exp1).filter(exp2)
    >> var f = 0
    >> scan.planFiles.asScala.foreach( p => f = f + 1 )
    >> var t = 0
    >> scan.planTasks.asScala.foreach( p => t = t + 1 )
    >> println("files to scan: " + f)
    >> println("tasks: " + t)
    >>
    >> files to scan: 164
    >> tasks: 15
    >>
    >>
    >> So iceberg-core and iceberg-api are doing the right thing, correctly
    >> applying my predicate and thus pruning most of the work.
    >>
    >>
    >> But what is the Spark Reader behavior?
    >>
    >> val datePartitionedIcebergDf = spark.read.format("iceberg").load("...")
    >>
    >> 
datePartitionedIcebergDf.createOrReplaceTempView("datePartitionedIcebergDf")
    >> spark.sql("""
    >> SELECT …
    >> WHERE …
    >>   AND  c_date = '2017-11-15'
    >> GROUP BY …
    >> LIMIT 10
    >> """
    >> ).show
    >>
    >>
    >> (1) Spark Jobs
    >> Job 0 View(Stages: 3/3)
    >> Stage 0:
    >> 2826/2826
    >> succeeded / total tasks
    >> Stage 1:
    >> 200/200
    >> succeeded / total tasks
    >> Stage 2:
    >> 1/1
    >> succeeded / total tasks
    >>
    >> Inspecting the physical plan, we see that the predicate does not get
    >> pushed into iceberg, and that it is phrased in a surprising way by Spark
    >> SQL:
    >>
    >> == Physical Plan ==
    >> CollectLimit 21
    >> +- *(3) Project [cast(minute1#227 as string) AS minute1#235,
    >> cast(pageviews#228 as string) AS pageviews#236]
    >>    +- *(3) GlobalLimit 10
    >>       +- Exchange SinglePartition
    >>          +- *(2) LocalLimit 10
    >>             +- *(2) HashAggregate(keys=[minute(timestamp#88,
    >> Some(Etc/UTC))#243], functions=[finalmerge_sum(merge sum#242) AS
    >> sum(value#239)#230], output=[minute1#227, pageviews#228])
    >>                +- Exchange hashpartitioning(minute(timestamp#88,
    >> Some(Etc/UTC))#243, 200)
    >>                   +- *(1) HashAggregate(keys=[minute(timestamp#88,
    >> Some(Etc/UTC)) AS minute(timestamp#88, Some(Etc/UTC))#243],
    >> functions=[partial_sum(value#239) AS sum#242], 
output=[minute(timestamp#88,
    >> Some(Etc/UTC))#243, sum#242])
    >>                      +- *(1) Project [timestamp#88,
    >> web#97.webPageDetails.pageViews.value AS value#239]
    >>                         +- *(1) Filter ((cast(c_date#107 as string) =
    >> 2017-11-15) && isnotnull(c_date#107))
    >>                            +- *(1) ScanV2 iceberg[timestamp#88, web#97,
    >> c_date#107] (Filters: [isnotnull(c_date#107)], Options:
    >> [path=adl://…/iceberg/,paths=[]])
    >>
    >> If we force the literal to be a DATE, then we get the right partition
    >> pruning behavior:
    >>
    >>
    >> 
datePartitionedIcebergDf.createOrReplaceTempView("datePartitionedIcebergDf")
    >> spark.sql("""
    >> SELECT …
    >> FROM datePartitionedIcebergDf
    >> WHERE c_date = CAST('2017-11-15' AS DATE)
    >> GROUP BY minute1
    >> LIMIT 10
    >> """
    >> ).show
    >>
    >> (1) Spark Jobs
    >> Job 5 View(Stages: 3/3)
    >> Stage 15:
    >> 15/15
    >> succeeded / total tasks
    >> Stage 16:
    >> 200/200
    >> succeeded / total tasks
    >> Stage 17:
    >> 1/1
    >> succeeded / total tasks
    >>
    >>
    >> And inspecting the physical plan we confirm correct behavior:
    >>
    >> == Physical Plan ==
    >> CollectLimit 21
    >> +- *(3) Project [cast(minute1#250 as string) AS minute1#258,
    >> cast(pageviews#251 as string) AS pageviews#259]
    >>    +- *(3) GlobalLimit 10
    >>       +- Exchange SinglePartition
    >>          +- *(2) LocalLimit 10
    >>             +- *(2) HashAggregate(keys=[minute(timestamp#88,
    >> Some(Etc/UTC))#266], functions=[finalmerge_sum(merge sum#265) AS
    >> sum(value#262)#253], output=[minute1#250, pageviews#251])
    >>                +- Exchange hashpartitioning(minute(timestamp#88,
    >> Some(Etc/UTC))#266, 200)
    >>                   +- *(1) HashAggregate(keys=[minute(timestamp#88,
    >> Some(Etc/UTC)) AS minute(timestamp#88, Some(Etc/UTC))#266],
    >> functions=[partial_sum(value#262) AS sum#265], 
output=[minute(timestamp#88,
    >> Some(Etc/UTC))#266, sum#265])
    >>                      +- *(1) Project [timestamp#88,
    >> web#97.webPageDetails.pageViews.value AS value#262]
    >>                         +- *(1) Filter ((c_date#107 = 17485) &&
    >> isnotnull(c_date#107))
    >>                            +- *(1) ScanV2 iceberg[timestamp#88, web#97,
    >> c_date#107] (Filters: [(c_date#107 = 17485), isnotnull(c_date#107)],
    >> Options: [path=adl://…/iceberg/,paths=[]])
    >>
    >>
    >> Discussion:
    >>
    >> The original Spark filter is of the form:
    >>
    >> CAST( date AS STRING) OP STRING_LITERAL
    >>
    >> Which Iceberg, today, doesn’t understand since it does not support CAST
    >> expressions. Thus a full table scan is done for my original query. 
Problem
    >> is, it’s very common for folks to use literals to express DATEs. At 
first,
    >> I thought, why on earth would Spark phrase the predicate like that, since
    >> its way cheaper to evaluate this:
    >>
    >> c_date = CAST(‘2017-11-15’ AS DATE)
    >>
    >> But it turns out that that phrasing would break pre-establish semantics:
    >> 
https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fspark%2Fpull%2F17174%23discussion_r104401305&amp;data=02%7C01%7Cxcollazo%40adobe.com%7Ccd424e4307a84bfc7d4708d68f7b9ccf%7Cfa7b1b5a7b34438794aed2c178decee1%7C0%7C0%7C636854158407499403&amp;sdata=f7sh4x3%2F%2FvyIf5m91cUF%2BjxzssIRVR9Nb2Ktv6IfOHY%3D&amp;reserved=0<https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fspark%2Fpull%2F17174%23discussion_r104401305&data=02%7C01%7Cxcollazo%40adobe.com%7Ce2e4b31270554a81a1be08d6915a4959%7Cfa7b1b5a7b34438794aed2c178decee1%7C0%7C0%7C636856214314620551&sdata=Xpz7efuUR41POJjWCZsYMsudM570ndXMJh%2FymajdWlU%3D&reserved=0>
    >>
    >>
    >> So we have two options I think:
    >>
    >> 1) Iceberg, upon seeing a CAST( date AS STRING) OP STRING_LITERAL can try
    >> to rephrase and resolve it on SparkExpressions. As per discussion in 
Spark
    >> thread above, I think it’s a safe conversion if we are strict on matching
    >> the predicate to be a comparison between a DATE and a literal.
    >>
    >> or
    >>
    >> 2) Iceberg should start supporting CAST predicates.
    >>
    >>
    >> (1) is way cheaper and less invasive, but I wonder if we will see other
    >> instances of similar behavior, or if the incoming predicate is just a bit
    >> more complicated and we would miss matching it with approach of (1), and
    >> thus should just bite the bullet and go for (2).
    >>
    >>
    >> Comments/Suggestions ?
    >>
    >>
    >> Xabriel J Collazo Mojica  |  Senior Software Engineer  |  Adobe  |
    >> xcoll...@adobe.com<mailto:xcoll...@adobe.com>
    >>
    >
    >
    > --
    > Ryan Blue
    > Software Engineer
    > Netflix
    >
    > --
    > You received this message because you are subscribed to the Google Groups
    > "Iceberg Developers" group.
    > To unsubscribe from this group and stop receiving emails from it, send an
    > email to 
iceberg-devel+unsubscr...@googlegroups.com<mailto:iceberg-devel%2bunsubscr...@googlegroups.com>.
    > To post to this group, send email to 
iceberg-de...@googlegroups.com<mailto:iceberg-de...@googlegroups.com>.
    > To view this discussion on the web visit
    > 
https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgroups.google.com%2Fd%2Fmsgid%2Ficeberg-devel%2FCAO4re1kd%253DSXapjL6kaBZ7nu1p%252BFuTOYiKY4iTKbAtjx3%252Bjgx5A%2540mail.gmail.com&amp;data=02%7C01%7Cxcollazo%40adobe.com%7Ccd424e4307a84bfc7d4708d68f7b9ccf%7Cfa7b1b5a7b34438794aed2c178decee1%7C0%7C0%7C636854158407499403&amp;sdata=JRiWLcQAl8qr0qOspzbGxLY9rILTrBiSR8XTnAW8hbU%3D&amp;reserved=0<https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgroups.google.com%2Fd%2Fmsgid%2Ficeberg-devel%2FCAO4re1kd%253DSXapjL6kaBZ7nu1p%252BFuTOYiKY4iTKbAtjx3%252Bjgx5A%2540mail.gmail.com&data=02%7C01%7Cxcollazo%40adobe.com%7Ce2e4b31270554a81a1be08d6915a4959%7Cfa7b1b5a7b34438794aed2c178decee1%7C0%7C0%7C636856214314620551&sdata=z4bv5mysuusyfZ2JMYoV%2FR2SCg2qHbXAbniyYrNCNAs%3D&reserved=0>
    > 
<https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgroups.google.com%2Fd%2Fmsgid%2Ficeberg-devel%2FCAO4re1kd%253DSXapjL6kaBZ7nu1p%252BFuTOYiKY4iTKbAtjx3%252Bjgx5A%2540mail.gmail.com%3Futm_medium%3Demail%26utm_source%3Dfooter&amp;data=02%7C01%7Cxcollazo%40adobe.com%7Ccd424e4307a84bfc7d4708d68f7b9ccf%7Cfa7b1b5a7b34438794aed2c178decee1%7C0%7C0%7C636854158407499403&amp;sdata=ibAcDlaOlRq6G7UxT1lLauUAdnvBsnqFdQ1kPFyfsfo%3D&amp;reserved=0<https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgroups.google.com%2Fd%2Fmsgid%2Ficeberg-devel%2FCAO4re1kd%253DSXapjL6kaBZ7nu1p%252BFuTOYiKY4iTKbAtjx3%252Bjgx5A%2540mail.gmail.com%3Futm_medium%3Demail%26utm_source%3Dfooter&data=02%7C01%7Cxcollazo%40adobe.com%7Ce2e4b31270554a81a1be08d6915a4959%7Cfa7b1b5a7b34438794aed2c178decee1%7C0%7C0%7C636856214314630559&sdata=bw5JmZRjonaXWFd7Dd7lJ5tZ%2B4siHIgUpD18UoNrdao%3D&reserved=0>>
    > .
    > For more options, visit 
https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgroups.google.com%2Fd%2Foptout&amp;data=02%7C01%7Cxcollazo%40adobe.com%7Ccd424e4307a84bfc7d4708d68f7b9ccf%7Cfa7b1b5a7b34438794aed2c178decee1%7C0%7C0%7C636854158407499403&amp;sdata=T544f3rMYb7w6J40E%2FqiNcnay%2FvGdDLsfxotk6z%2FiHc%3D&amp;reserved=0<https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgroups.google.com%2Fd%2Foptout&data=02%7C01%7Cxcollazo%40adobe.com%7Ce2e4b31270554a81a1be08d6915a4959%7Cfa7b1b5a7b34438794aed2c178decee1%7C0%7C0%7C636856214314640567&sdata=dcHIy183%2BDFHr5HRXgwuU7TFDmOhwFYX2pqU6I3KFCI%3D&reserved=0>.
    >



--
Ryan Blue
Software Engineer
Netflix

Reply via email to