I added SparkExpressions because Spark at one point was able to pass an
expression instead of a filter. The Spark community decided to remove that
for 2.4.0 though, so we moved back to using filters.

On Wed, Feb 13, 2019 at 9:11 AM Xabriel Collazo Mojica <xcoll...@adobe.com>
wrote:

> Got it. Thanks for the clarification Ryan.
>
>
>
> I think I got confused by this convert() method:
> https://github.com/apache/incubator-iceberg/blob/master/spark/src/main/java/com/netflix/iceberg/spark/SparkExpressions.java#L106
>
>
>
> *Xabriel J Collazo Mojica*  |  Senior Software Engineer  |  Adobe  |
> xcoll...@adobe.com
>
>
>
> *From: *Ryan Blue <rb...@netflix.com>
> *Reply-To: *"rb...@netflix.com" <rb...@netflix.com>
> *Date: *Tuesday, February 12, 2019 at 6:23 PM
> *To: *Xabriel Collazo Mojica <xcoll...@adobe.com>
> *Cc: *"dev@iceberg.apache.org" <dev@iceberg.apache.org>, "
> iceberg-de...@googlegroups.com" <iceberg-de...@googlegroups.com>
> *Subject: *Re: Predicates against DATEs with String Literals coming from
> Spark are ignored by Iceberg
>
>
>
> Iceberg doesn't translate Spark filters to Spark expressions. It
> translates Spark filters to Iceberg expressions:
> https://github.com/apache/incubator-iceberg/blob/master/spark/src/main/java/com/netflix/iceberg/spark/source/Reader.java#L150
> <https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fincubator-iceberg%2Fblob%2Fmaster%2Fspark%2Fsrc%2Fmain%2Fjava%2Fcom%2Fnetflix%2Ficeberg%2Fspark%2Fsource%2FReader.java%23L150&data=02%7C01%7Cxcollazo%40adobe.com%7Ce2e4b31270554a81a1be08d6915a4959%7Cfa7b1b5a7b34438794aed2c178decee1%7C0%7C0%7C636856214314600534&sdata=N7T7a7DbBSya4IsaWDwsWuc8cKSbiBwFw6gwBawiUkU%3D&reserved=0>
>
>
>
> So we need Spark to be smarter about the expressions it can convert to
> filters.
>
>
>
> On Mon, Feb 11, 2019 at 11:00 AM Xabriel Collazo Mojica <
> xcoll...@adobe.com> wrote:
>
> Thanks for the interest in this issue.
>
> Comments in line below in between tags <xabriel> </xabriel>.
>
> Xabriel J Collazo Mojica  |  Senior Software Engineer  |  Adobe  |
> xcoll...@adobe.com
>
>
> On 2/10/19, 9:17 AM, "Renato Marroquín Mogrovejo" <
> renatoj.marroq...@gmail.com> wrote:
>
>     Sorry to jump into the thread, but I have a quick question.
>     You are right that CAST("2019-01-01" AS DATE) = date and "2019-01-01" =
>     CAST(date AS STRING) are semantically equivalent, but the first one
> implies
>     casting a single value whereas the second one implies casting all
> possible
>     values that we will encounter. Is this what is intended?
>
> <xabriel>
> Right, this is the surprising behavior that I pointed out from Spark, and
> you can read more about it on an open PR from Spark that I linked to in
> original email.
> </xabriel>
>
> Or will specific
>     processing engines (Spark, Presto) do the right here after Iceberg
> returns
>     back the right files?
>
> <xabriel>
> Spark is the engine that produced the surprising phrasing of casting the
> DATE to a STRING. But Spark has an interface so that, when a Data Source
> can't apply the predicate that is being pushed down, we can tell the engine
> to reapply it (1). But in my case, not applying that predicate is very
> costly.
> (1) :
> https://developer.ibm.com/code/2018/04/16/introducing-apache-spark-data-sources-api-v2/
> <https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fdeveloper.ibm.com%2Fcode%2F2018%2F04%2F16%2Fintroducing-apache-spark-data-sources-api-v2%2F&data=02%7C01%7Cxcollazo%40adobe.com%7Ce2e4b31270554a81a1be08d6915a4959%7Cfa7b1b5a7b34438794aed2c178decee1%7C0%7C0%7C636856214314600534&sdata=4u70kEcHEa2Bl5l1E%2F7dlkczPqcBXP6W1%2BY3X0ClptU%3D&reserved=0>
> </xabriel>
>
>     El vie., 8 feb. 2019 a las 21:52, 'Ryan Blue' via Iceberg Developers (<
>     iceberg-de...@googlegroups.com>) escribió:
>
>     > Hi Xabriel,
>     > Part of the problem is that the Spark community doesn’t want to pass
>     > Expression to data source implementations because it is internal.
> From
>     > Spark’s perspective, this is a perfectly reasonable thing to do
> because it
>     > has a public API for passing some expressions to the sources, the
> Filter
>     > classes
>     > <
> https://na01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.sql.sources.Filter&amp;data=02%7C01%7Cxcollazo%40adobe.com%7Ccd424e4307a84bfc7d4708d68f7b9ccf%7Cfa7b1b5a7b34438794aed2c178decee1%7C0%7C0%7C636854158407499403&amp;sdata=E9TNBE7D9kTzws8RO%2Bq9z3JzpleSiympigObfojSlcA%3D&amp;reserved=0
> <https://na01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.sql.sources.Filter&data=02%7C01%7Cxcollazo%40adobe.com%7Ce2e4b31270554a81a1be08d6915a4959%7Cfa7b1b5a7b34438794aed2c178decee1%7C0%7C0%7C636856214314610542&sdata=3nrLqkH1YSl5QtWSXuHsVUHUKk5eJj2BeNIVtqKQByc%3D&reserved=0>
> >
>     > .
>     >
>     > I think the right path forward is to update the conversion from
> Expression
>     > to Filter.
>
> <xabriel>
> Ah, I had missed that Iceberg was doing that conversion from Filter back
> to Spark Expressions. Is this for historical reasons? I agree that, if
> Spark Expressions are internal, it makes sense to move out.
> </xabriel>
>
> CAST("2019-01-01" AS DATE) = date and "2019-01-01" = CAST(date
>     > AS STRING) are equivalent, so we should be able to convert to the
> right
>     > filter.
>     >
>     > We have other reasons to make this conversion better as well. For
> example,
>     > when partitioning by date_col = CAST(timestamp_col AS DATE), it is
> common
>     > for users to want to use an equality predicate, date_col =
> "2019-01-01".
>     > When that date_col is a hidden partition, PARTITION BY
> date(timestamp_col),
>     > users still want to use an equality predicate instead of a timestamp
> range.
>     > So it would be great if conversion could handle CAST(timestamp_col AS
>     > DATE) = "2019-01-01".
>     >
>     > I’ll open a PR on Spark to get these fixes in.
>
> <xabriel>
> Let me make sure I follow: You intend to open a Spark Issue to get the
> Filter interface to also push down CASTs, while we would open an Iceberg PR
> to stop doing the Spark Filters > Spark Expressions > Iceberg Expressions
> conversion and start going from Spark Filter directly to Iceberg
> Expressions. Correct?
> </xabriel>
>
>     >
>     > rb
>     >
>     > On Thu, Feb 7, 2019 at 11:07 AM Xabriel Collazo Mojica
>     > <xcoll...@adobe.com.invalid> wrote:
>     >
>     >> Hello folks,
>     >>
>     >> I have been debugging this one for a bit now, so wanted to share my
>     >> findings, and ask for suggestions.
>     >>
>     >> I have a table with PartitionSpec like so:
>     >>
>     >> [
>     >>   batchId: identity(21)
>     >>   c_date: identity(22)
>     >> ]
>     >>
>     >> where c_date is a DATE.
>     >>
>     >> How many files/tasks would a full table scan scan?
>     >>
>     >> import com.netflix.iceberg.TableScan
>     >> import scala.collection.JavaConverters._
>     >>
>     >> val scan = iceTable.newScan()
>     >> var f = 0
>     >> scan.planFiles.asScala.foreach( p => f = f + 1 )
>     >> var t = 0
>     >> scan.planTasks.asScala.foreach( p => t = t + 1 )
>     >> println("files to scan: " + f)
>     >> println("tasks: " + t)
>     >>
>     >> files to scan: 29520
>     >> tasks: 2826
>     >>
>     >>
>     >> How many files would a table scan with a partition predicate scan?
>     >>
>     >>
>     >> import com.netflix.iceberg.TableScan
>     >> import scala.collection.JavaConverters._
>     >> import com.netflix.iceberg.expressions._
>     >>
>     >> val exp1 = Expressions.equal("c_date", "2017-11-15")
>     >>
>     >> val scan = iceTable.newScan().filter(exp1).filter(exp2)
>     >> var f = 0
>     >> scan.planFiles.asScala.foreach( p => f = f + 1 )
>     >> var t = 0
>     >> scan.planTasks.asScala.foreach( p => t = t + 1 )
>     >> println("files to scan: " + f)
>     >> println("tasks: " + t)
>     >>
>     >> files to scan: 164
>     >> tasks: 15
>     >>
>     >>
>     >> So iceberg-core and iceberg-api are doing the right thing, correctly
>     >> applying my predicate and thus pruning most of the work.
>     >>
>     >>
>     >> But what is the Spark Reader behavior?
>     >>
>     >> val datePartitionedIcebergDf =
> spark.read.format("iceberg").load("...")
>     >>
>     >>
> datePartitionedIcebergDf.createOrReplaceTempView("datePartitionedIcebergDf")
>     >> spark.sql("""
>     >> SELECT …
>     >> WHERE …
>     >>   AND  c_date = '2017-11-15'
>     >> GROUP BY …
>     >> LIMIT 10
>     >> """
>     >> ).show
>     >>
>     >>
>     >> (1) Spark Jobs
>     >> Job 0 View(Stages: 3/3)
>     >> Stage 0:
>     >> 2826/2826
>     >> succeeded / total tasks
>     >> Stage 1:
>     >> 200/200
>     >> succeeded / total tasks
>     >> Stage 2:
>     >> 1/1
>     >> succeeded / total tasks
>     >>
>     >> Inspecting the physical plan, we see that the predicate does not get
>     >> pushed into iceberg, and that it is phrased in a surprising way by
> Spark
>     >> SQL:
>     >>
>     >> == Physical Plan ==
>     >> CollectLimit 21
>     >> +- *(3) Project [cast(minute1#227 as string) AS minute1#235,
>     >> cast(pageviews#228 as string) AS pageviews#236]
>     >>    +- *(3) GlobalLimit 10
>     >>       +- Exchange SinglePartition
>     >>          +- *(2) LocalLimit 10
>     >>             +- *(2) HashAggregate(keys=[minute(timestamp#88,
>     >> Some(Etc/UTC))#243], functions=[finalmerge_sum(merge sum#242) AS
>     >> sum(value#239)#230], output=[minute1#227, pageviews#228])
>     >>                +- Exchange hashpartitioning(minute(timestamp#88,
>     >> Some(Etc/UTC))#243, 200)
>     >>                   +- *(1) HashAggregate(keys=[minute(timestamp#88,
>     >> Some(Etc/UTC)) AS minute(timestamp#88, Some(Etc/UTC))#243],
>     >> functions=[partial_sum(value#239) AS sum#242],
> output=[minute(timestamp#88,
>     >> Some(Etc/UTC))#243, sum#242])
>     >>                      +- *(1) Project [timestamp#88,
>     >> web#97.webPageDetails.pageViews.value AS value#239]
>     >>                         +- *(1) Filter ((cast(c_date#107 as string)
> =
>     >> 2017-11-15) && isnotnull(c_date#107))
>     >>                            +- *(1) ScanV2 iceberg[timestamp#88,
> web#97,
>     >> c_date#107] (Filters: [isnotnull(c_date#107)], Options:
>     >> [path=adl://…/iceberg/,paths=[]])
>     >>
>     >> If we force the literal to be a DATE, then we get the right
> partition
>     >> pruning behavior:
>     >>
>     >>
>     >>
> datePartitionedIcebergDf.createOrReplaceTempView("datePartitionedIcebergDf")
>     >> spark.sql("""
>     >> SELECT …
>     >> FROM datePartitionedIcebergDf
>     >> WHERE c_date = CAST('2017-11-15' AS DATE)
>     >> GROUP BY minute1
>     >> LIMIT 10
>     >> """
>     >> ).show
>     >>
>     >> (1) Spark Jobs
>     >> Job 5 View(Stages: 3/3)
>     >> Stage 15:
>     >> 15/15
>     >> succeeded / total tasks
>     >> Stage 16:
>     >> 200/200
>     >> succeeded / total tasks
>     >> Stage 17:
>     >> 1/1
>     >> succeeded / total tasks
>     >>
>     >>
>     >> And inspecting the physical plan we confirm correct behavior:
>     >>
>     >> == Physical Plan ==
>     >> CollectLimit 21
>     >> +- *(3) Project [cast(minute1#250 as string) AS minute1#258,
>     >> cast(pageviews#251 as string) AS pageviews#259]
>     >>    +- *(3) GlobalLimit 10
>     >>       +- Exchange SinglePartition
>     >>          +- *(2) LocalLimit 10
>     >>             +- *(2) HashAggregate(keys=[minute(timestamp#88,
>     >> Some(Etc/UTC))#266], functions=[finalmerge_sum(merge sum#265) AS
>     >> sum(value#262)#253], output=[minute1#250, pageviews#251])
>     >>                +- Exchange hashpartitioning(minute(timestamp#88,
>     >> Some(Etc/UTC))#266, 200)
>     >>                   +- *(1) HashAggregate(keys=[minute(timestamp#88,
>     >> Some(Etc/UTC)) AS minute(timestamp#88, Some(Etc/UTC))#266],
>     >> functions=[partial_sum(value#262) AS sum#265],
> output=[minute(timestamp#88,
>     >> Some(Etc/UTC))#266, sum#265])
>     >>                      +- *(1) Project [timestamp#88,
>     >> web#97.webPageDetails.pageViews.value AS value#262]
>     >>                         +- *(1) Filter ((c_date#107 = 17485) &&
>     >> isnotnull(c_date#107))
>     >>                            +- *(1) ScanV2 iceberg[timestamp#88,
> web#97,
>     >> c_date#107] (Filters: [(c_date#107 = 17485), isnotnull(c_date#107)],
>     >> Options: [path=adl://…/iceberg/,paths=[]])
>     >>
>     >>
>     >> Discussion:
>     >>
>     >> The original Spark filter is of the form:
>     >>
>     >> CAST( date AS STRING) OP STRING_LITERAL
>     >>
>     >> Which Iceberg, today, doesn’t understand since it does not support
> CAST
>     >> expressions. Thus a full table scan is done for my original query.
> Problem
>     >> is, it’s very common for folks to use literals to express DATEs. At
> first,
>     >> I thought, why on earth would Spark phrase the predicate like that,
> since
>     >> its way cheaper to evaluate this:
>     >>
>     >> c_date = CAST(‘2017-11-15’ AS DATE)
>     >>
>     >> But it turns out that that phrasing would break pre-establish
> semantics:
>     >>
> https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fspark%2Fpull%2F17174%23discussion_r104401305&amp;data=02%7C01%7Cxcollazo%40adobe.com%7Ccd424e4307a84bfc7d4708d68f7b9ccf%7Cfa7b1b5a7b34438794aed2c178decee1%7C0%7C0%7C636854158407499403&amp;sdata=f7sh4x3%2F%2FvyIf5m91cUF%2BjxzssIRVR9Nb2Ktv6IfOHY%3D&amp;reserved=0
> <https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fspark%2Fpull%2F17174%23discussion_r104401305&data=02%7C01%7Cxcollazo%40adobe.com%7Ce2e4b31270554a81a1be08d6915a4959%7Cfa7b1b5a7b34438794aed2c178decee1%7C0%7C0%7C636856214314620551&sdata=Xpz7efuUR41POJjWCZsYMsudM570ndXMJh%2FymajdWlU%3D&reserved=0>
>     >>
>     >>
>     >> So we have two options I think:
>     >>
>     >> 1) Iceberg, upon seeing a CAST( date AS STRING) OP STRING_LITERAL
> can try
>     >> to rephrase and resolve it on SparkExpressions. As per discussion
> in Spark
>     >> thread above, I think it’s a safe conversion if we are strict on
> matching
>     >> the predicate to be a comparison between a DATE and a literal.
>     >>
>     >> or
>     >>
>     >> 2) Iceberg should start supporting CAST predicates.
>     >>
>     >>
>     >> (1) is way cheaper and less invasive, but I wonder if we will see
> other
>     >> instances of similar behavior, or if the incoming predicate is just
> a bit
>     >> more complicated and we would miss matching it with approach of
> (1), and
>     >> thus should just bite the bullet and go for (2).
>     >>
>     >>
>     >> Comments/Suggestions ?
>     >>
>     >>
>     >> Xabriel J Collazo Mojica  |  Senior Software Engineer  |  Adobe  |
>     >> xcoll...@adobe.com
>     >>
>     >
>     >
>     > --
>     > Ryan Blue
>     > Software Engineer
>     > Netflix
>     >
>     > --
>     > You received this message because you are subscribed to the Google
> Groups
>     > "Iceberg Developers" group.
>     > To unsubscribe from this group and stop receiving emails from it,
> send an
>     > email to iceberg-devel+unsubscr...@googlegroups.com.
>     > To post to this group, send email to iceberg-de...@googlegroups.com.
>     > To view this discussion on the web visit
>     >
> https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgroups.google.com%2Fd%2Fmsgid%2Ficeberg-devel%2FCAO4re1kd%253DSXapjL6kaBZ7nu1p%252BFuTOYiKY4iTKbAtjx3%252Bjgx5A%2540mail.gmail.com&amp;data=02%7C01%7Cxcollazo%40adobe.com%7Ccd424e4307a84bfc7d4708d68f7b9ccf%7Cfa7b1b5a7b34438794aed2c178decee1%7C0%7C0%7C636854158407499403&amp;sdata=JRiWLcQAl8qr0qOspzbGxLY9rILTrBiSR8XTnAW8hbU%3D&amp;reserved=0
> <https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgroups.google.com%2Fd%2Fmsgid%2Ficeberg-devel%2FCAO4re1kd%253DSXapjL6kaBZ7nu1p%252BFuTOYiKY4iTKbAtjx3%252Bjgx5A%2540mail.gmail.com&data=02%7C01%7Cxcollazo%40adobe.com%7Ce2e4b31270554a81a1be08d6915a4959%7Cfa7b1b5a7b34438794aed2c178decee1%7C0%7C0%7C636856214314620551&sdata=z4bv5mysuusyfZ2JMYoV%2FR2SCg2qHbXAbniyYrNCNAs%3D&reserved=0>
>     > <
> https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgroups.google.com%2Fd%2Fmsgid%2Ficeberg-devel%2FCAO4re1kd%253DSXapjL6kaBZ7nu1p%252BFuTOYiKY4iTKbAtjx3%252Bjgx5A%2540mail.gmail.com%3Futm_medium%3Demail%26utm_source%3Dfooter&amp;data=02%7C01%7Cxcollazo%40adobe.com%7Ccd424e4307a84bfc7d4708d68f7b9ccf%7Cfa7b1b5a7b34438794aed2c178decee1%7C0%7C0%7C636854158407499403&amp;sdata=ibAcDlaOlRq6G7UxT1lLauUAdnvBsnqFdQ1kPFyfsfo%3D&amp;reserved=0
> <https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgroups.google.com%2Fd%2Fmsgid%2Ficeberg-devel%2FCAO4re1kd%253DSXapjL6kaBZ7nu1p%252BFuTOYiKY4iTKbAtjx3%252Bjgx5A%2540mail.gmail.com%3Futm_medium%3Demail%26utm_source%3Dfooter&data=02%7C01%7Cxcollazo%40adobe.com%7Ce2e4b31270554a81a1be08d6915a4959%7Cfa7b1b5a7b34438794aed2c178decee1%7C0%7C0%7C636856214314630559&sdata=bw5JmZRjonaXWFd7Dd7lJ5tZ%2B4siHIgUpD18UoNrdao%3D&reserved=0>
> >
>     > .
>     > For more options, visit
> https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgroups.google.com%2Fd%2Foptout&amp;data=02%7C01%7Cxcollazo%40adobe.com%7Ccd424e4307a84bfc7d4708d68f7b9ccf%7Cfa7b1b5a7b34438794aed2c178decee1%7C0%7C0%7C636854158407499403&amp;sdata=T544f3rMYb7w6J40E%2FqiNcnay%2FvGdDLsfxotk6z%2FiHc%3D&amp;reserved=0
> <https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgroups.google.com%2Fd%2Foptout&data=02%7C01%7Cxcollazo%40adobe.com%7Ce2e4b31270554a81a1be08d6915a4959%7Cfa7b1b5a7b34438794aed2c178decee1%7C0%7C0%7C636856214314640567&sdata=dcHIy183%2BDFHr5HRXgwuU7TFDmOhwFYX2pqU6I3KFCI%3D&reserved=0>
> .
>     >
>
>
>
>
> --
>
> Ryan Blue
>
> Software Engineer
>
> Netflix
>


-- 
Ryan Blue
Software Engineer
Netflix

Reply via email to