Sorry to jump into the thread, but I have a quick question.
You are right that CAST("2019-01-01" AS DATE) = date and "2019-01-01" =
CAST(date AS STRING) are semantically equivalent, but the first one implies
casting a single value whereas the second one implies casting all possible
values that we will encounter. Is this what is intended? Or will specific
processing engines (Spark, Presto) do the right here after Iceberg returns
back the right files?

El vie., 8 feb. 2019 a las 21:52, 'Ryan Blue' via Iceberg Developers (<
iceberg-de...@googlegroups.com>) escribió:

> Hi Xabriel,
> Part of the problem is that the Spark community doesn’t want to pass
> Expression to data source implementations because it is internal. From
> Spark’s perspective, this is a perfectly reasonable thing to do because it
> has a public API for passing some expressions to the sources, the Filter
> classes
> <http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.sources.Filter>
> .
>
> I think the right path forward is to update the conversion from Expression
> to Filter. CAST("2019-01-01" AS DATE) = date and "2019-01-01" = CAST(date
> AS STRING) are equivalent, so we should be able to convert to the right
> filter.
>
> We have other reasons to make this conversion better as well. For example,
> when partitioning by date_col = CAST(timestamp_col AS DATE), it is common
> for users to want to use an equality predicate, date_col = "2019-01-01".
> When that date_col is a hidden partition, PARTITION BY date(timestamp_col),
> users still want to use an equality predicate instead of a timestamp range.
> So it would be great if conversion could handle CAST(timestamp_col AS
> DATE) = "2019-01-01".
>
> I’ll open a PR on Spark to get these fixes in.
>
> rb
>
> On Thu, Feb 7, 2019 at 11:07 AM Xabriel Collazo Mojica
> <xcoll...@adobe.com.invalid> wrote:
>
>> Hello folks,
>>
>> I have been debugging this one for a bit now, so wanted to share my
>> findings, and ask for suggestions.
>>
>> I have a table with PartitionSpec like so:
>>
>> [
>>   batchId: identity(21)
>>   c_date: identity(22)
>> ]
>>
>> where c_date is a DATE.
>>
>> How many files/tasks would a full table scan scan?
>>
>> import com.netflix.iceberg.TableScan
>> import scala.collection.JavaConverters._
>>
>> val scan = iceTable.newScan()
>> var f = 0
>> scan.planFiles.asScala.foreach( p => f = f + 1 )
>> var t = 0
>> scan.planTasks.asScala.foreach( p => t = t + 1 )
>> println("files to scan: " + f)
>> println("tasks: " + t)
>>
>> files to scan: 29520
>> tasks: 2826
>>
>>
>> How many files would a table scan with a partition predicate scan?
>>
>>
>> import com.netflix.iceberg.TableScan
>> import scala.collection.JavaConverters._
>> import com.netflix.iceberg.expressions._
>>
>> val exp1 = Expressions.equal("c_date", "2017-11-15")
>>
>> val scan = iceTable.newScan().filter(exp1).filter(exp2)
>> var f = 0
>> scan.planFiles.asScala.foreach( p => f = f + 1 )
>> var t = 0
>> scan.planTasks.asScala.foreach( p => t = t + 1 )
>> println("files to scan: " + f)
>> println("tasks: " + t)
>>
>> files to scan: 164
>> tasks: 15
>>
>>
>> So iceberg-core and iceberg-api are doing the right thing, correctly
>> applying my predicate and thus pruning most of the work.
>>
>>
>> But what is the Spark Reader behavior?
>>
>> val datePartitionedIcebergDf = spark.read.format("iceberg").load("...")
>>
>> datePartitionedIcebergDf.createOrReplaceTempView("datePartitionedIcebergDf")
>> spark.sql("""
>> SELECT …
>> WHERE …
>>   AND  c_date = '2017-11-15'
>> GROUP BY …
>> LIMIT 10
>> """
>> ).show
>>
>>
>> (1) Spark Jobs
>> Job 0 View(Stages: 3/3)
>> Stage 0:
>> 2826/2826
>> succeeded / total tasks
>> Stage 1:
>> 200/200
>> succeeded / total tasks
>> Stage 2:
>> 1/1
>> succeeded / total tasks
>>
>> Inspecting the physical plan, we see that the predicate does not get
>> pushed into iceberg, and that it is phrased in a surprising way by Spark
>> SQL:
>>
>> == Physical Plan ==
>> CollectLimit 21
>> +- *(3) Project [cast(minute1#227 as string) AS minute1#235,
>> cast(pageviews#228 as string) AS pageviews#236]
>>    +- *(3) GlobalLimit 10
>>       +- Exchange SinglePartition
>>          +- *(2) LocalLimit 10
>>             +- *(2) HashAggregate(keys=[minute(timestamp#88,
>> Some(Etc/UTC))#243], functions=[finalmerge_sum(merge sum#242) AS
>> sum(value#239)#230], output=[minute1#227, pageviews#228])
>>                +- Exchange hashpartitioning(minute(timestamp#88,
>> Some(Etc/UTC))#243, 200)
>>                   +- *(1) HashAggregate(keys=[minute(timestamp#88,
>> Some(Etc/UTC)) AS minute(timestamp#88, Some(Etc/UTC))#243],
>> functions=[partial_sum(value#239) AS sum#242], output=[minute(timestamp#88,
>> Some(Etc/UTC))#243, sum#242])
>>                      +- *(1) Project [timestamp#88,
>> web#97.webPageDetails.pageViews.value AS value#239]
>>                         +- *(1) Filter ((cast(c_date#107 as string) =
>> 2017-11-15) && isnotnull(c_date#107))
>>                            +- *(1) ScanV2 iceberg[timestamp#88, web#97,
>> c_date#107] (Filters: [isnotnull(c_date#107)], Options:
>> [path=adl://…/iceberg/,paths=[]])
>>
>> If we force the literal to be a DATE, then we get the right partition
>> pruning behavior:
>>
>>
>> datePartitionedIcebergDf.createOrReplaceTempView("datePartitionedIcebergDf")
>> spark.sql("""
>> SELECT …
>> FROM datePartitionedIcebergDf
>> WHERE c_date = CAST('2017-11-15' AS DATE)
>> GROUP BY minute1
>> LIMIT 10
>> """
>> ).show
>>
>> (1) Spark Jobs
>> Job 5 View(Stages: 3/3)
>> Stage 15:
>> 15/15
>> succeeded / total tasks
>> Stage 16:
>> 200/200
>> succeeded / total tasks
>> Stage 17:
>> 1/1
>> succeeded / total tasks
>>
>>
>> And inspecting the physical plan we confirm correct behavior:
>>
>> == Physical Plan ==
>> CollectLimit 21
>> +- *(3) Project [cast(minute1#250 as string) AS minute1#258,
>> cast(pageviews#251 as string) AS pageviews#259]
>>    +- *(3) GlobalLimit 10
>>       +- Exchange SinglePartition
>>          +- *(2) LocalLimit 10
>>             +- *(2) HashAggregate(keys=[minute(timestamp#88,
>> Some(Etc/UTC))#266], functions=[finalmerge_sum(merge sum#265) AS
>> sum(value#262)#253], output=[minute1#250, pageviews#251])
>>                +- Exchange hashpartitioning(minute(timestamp#88,
>> Some(Etc/UTC))#266, 200)
>>                   +- *(1) HashAggregate(keys=[minute(timestamp#88,
>> Some(Etc/UTC)) AS minute(timestamp#88, Some(Etc/UTC))#266],
>> functions=[partial_sum(value#262) AS sum#265], output=[minute(timestamp#88,
>> Some(Etc/UTC))#266, sum#265])
>>                      +- *(1) Project [timestamp#88,
>> web#97.webPageDetails.pageViews.value AS value#262]
>>                         +- *(1) Filter ((c_date#107 = 17485) &&
>> isnotnull(c_date#107))
>>                            +- *(1) ScanV2 iceberg[timestamp#88, web#97,
>> c_date#107] (Filters: [(c_date#107 = 17485), isnotnull(c_date#107)],
>> Options: [path=adl://…/iceberg/,paths=[]])
>>
>>
>> Discussion:
>>
>> The original Spark filter is of the form:
>>
>> CAST( date AS STRING) OP STRING_LITERAL
>>
>> Which Iceberg, today, doesn’t understand since it does not support CAST
>> expressions. Thus a full table scan is done for my original query. Problem
>> is, it’s very common for folks to use literals to express DATEs. At first,
>> I thought, why on earth would Spark phrase the predicate like that, since
>> its way cheaper to evaluate this:
>>
>> c_date = CAST(‘2017-11-15’ AS DATE)
>>
>> But it turns out that that phrasing would break pre-establish semantics:
>> https://github.com/apache/spark/pull/17174#discussion_r104401305
>>
>>
>> So we have two options I think:
>>
>> 1) Iceberg, upon seeing a CAST( date AS STRING) OP STRING_LITERAL can try
>> to rephrase and resolve it on SparkExpressions. As per discussion in Spark
>> thread above, I think it’s a safe conversion if we are strict on matching
>> the predicate to be a comparison between a DATE and a literal.
>>
>> or
>>
>> 2) Iceberg should start supporting CAST predicates.
>>
>>
>> (1) is way cheaper and less invasive, but I wonder if we will see other
>> instances of similar behavior, or if the incoming predicate is just a bit
>> more complicated and we would miss matching it with approach of (1), and
>> thus should just bite the bullet and go for (2).
>>
>>
>> Comments/Suggestions ?
>>
>>
>> Xabriel J Collazo Mojica  |  Senior Software Engineer  |  Adobe  |
>> xcoll...@adobe.com
>>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>
> --
> You received this message because you are subscribed to the Google Groups
> "Iceberg Developers" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to iceberg-devel+unsubscr...@googlegroups.com.
> To post to this group, send email to iceberg-de...@googlegroups.com.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/iceberg-devel/CAO4re1kd%3DSXapjL6kaBZ7nu1p%2BFuTOYiKY4iTKbAtjx3%2Bjgx5A%40mail.gmail.com
> <https://groups.google.com/d/msgid/iceberg-devel/CAO4re1kd%3DSXapjL6kaBZ7nu1p%2BFuTOYiKY4iTKbAtjx3%2Bjgx5A%40mail.gmail.com?utm_medium=email&utm_source=footer>
> .
> For more options, visit https://groups.google.com/d/optout.
>

Reply via email to