I agree that 'non-deterministic' is the right term for what it currently
does: mark an expression as non-deterministic (returns different values
for the same input, e.g. rand()), and the optimizer does its best to not
break semantics when moving expressions around.
In case of expensive deterministic UDFs, or any expensive deterministic
expression, the optimizer should not multiply effort. Even in case of
cheap expressions like a * 5, where performance impact is comparably
small, it simply should not execute things twice. So this is not about
expensive deterministic expressions but deterministic expressions that
get referenced multiple times.
Pushing those expressions into other expressions that reference them is
useful in order to simplify those other expressions, e.g.
df.withColumn("b", not($"a")).where(not($"b")) will eliminate the double
negation of a.
So if expressions are referenced multiple times, they should not be
collapsed, unless referencing expressions get simplified. And then the
simplification must pay off for evaluating the referenced expression
twice. This needs some kind of cost-model, or at least heuristics.
In case of UDFs, I think they should never be collapsed because they
cannot be used to simplify other expressions (can they?). They should
rather be materialised as close to the first reference as possible. If
executing the UDF and referencing it multiple times happens in the same
stage, hence the same generated code, we end up with the perfect
situation where that materialisation of the result per call is hold in
memory and processed by all referencing expressions.
Marking UDFs as expensive is not the right approach here, I agree, they
should simply not be executed multiple times.
Enrico
Am 08.11.19 um 04:26 schrieb Wenchen Fan:
We really need some documents to define what non-deterministic means.
AFAIK, non-deterministic expressions may produce a different result
for the same input row, if the already processed input rows are
different.
The optimizer tries its best to not change the input sequence
of non-deterministic expressions. For example, `df.select(...,
nonDeterministicExpr).filter...` can't do filter pushdown. An
exception is filter condition. For `df.filter(nonDeterministic &&
cond)`, Spark still pushes down `cond` even if it may change the input
sequence of the first condition. This is to respect the SQL semantic
that filter conditions ANDed together are order-insensitive. Users
should write `df.filter(nonDeterministic).filter(cond)` to guarantee
the order.
For this particular problem, I think it's not only about UDF, but a
general problem with how Spark collapses Projects.
For example, `df.select('a * 5 as 'b).select('b + 2, 'b + 3)`, Spark
optimizes it to `df.select('a * 5 + 2, 'a * 5 + 3)`, and execute 'a *
5 twice.
I think we should revisit this optimization and think about when we
can collapse.
On Thu, Nov 7, 2019 at 6:20 PM Rubén Berenguel <rbereng...@gmail.com
<mailto:rbereng...@gmail.com>> wrote:
That was very interesting, thanks Enrico.
Sean, IIRC it also prevents push down of the UDF in Catalyst in
some cases.
Regards,
Ruben
> On 7 Nov 2019, at 11:09, Sean Owen <sro...@gmail.com
<mailto:sro...@gmail.com>> wrote:
>
> Interesting, what does non-deterministic do except have this effect?
> aside from the naming, it could be a fine use of this flag if that's
> all it effectively does. I'm not sure I'd introduce another flag
with
> the same semantics just over naming. If anything 'expensive' also
> isn't the right word, more like 'try not to evaluate multiple
times'.
>
> Why isn't caching the answer? I realize it's big, but you can
cache to
> disk. This may be faster than whatever plan reordering has to happen
> to evaluate once.
>
> Usually I'd say, can you redesign your UDF and code to be more
> efficient too? or use a big a cluster if that's really what you
need.
>
> At first look, no I don't think this Spark-side workaround for
naming
> for your use case is worthwhile. There are existing better
solutions.
>
> On Thu, Nov 7, 2019 at 2:45 AM Enrico Minack
<m...@enrico.minack.dev <mailto:m...@enrico.minack.dev>> wrote:
>>
>> Hi all,
>>
>> Running expensive deterministic UDFs that return complex types,
followed by multiple references to those results cause Spark to
evaluate the UDF multiple times per row. This has been reported
and discussed before: SPARK-18748 SPARK-17728
>>
>> val f: Int => Array[Int]
>> val udfF = udf(f)
>> df
>> .select($"id", udfF($"id").as("array"))
>> .select($"array"(0).as("array0"), $"array"(1).as("array1"))
>>
>> A common approach to make Spark evaluate the UDF only once is
to cache the intermediate result right after projecting the UDF:
>>
>> df
>> .select($"id", udfF($"id").as("array"))
>> .cache()
>> .select($"array"(0).as("array0"), $"array"(1).as("array1"))
>>
>> There are scenarios where this intermediate result is too big
for the cluster to cache. Also this is bad design.
>>
>> The best approach is to mark the UDF as non-deterministic. Then
Spark optimizes the query in a way that the UDF gets called only
once per row, exactly what you want.
>>
>> val udfF = udf(f).asNondeterministic()
>>
>> However, stating a UDF is non-deterministic though it clearly
is deterministic is counter-intuitive and makes your code harder
to read.
>>
>> Spark should provide a better way to flag the UDF. Calling it
expensive would be a better naming here.
>>
>> val udfF = udf(f).asExpensive()
>>
>> I understand that deterministic is a notion that Expression
provides, and there is no equivalent to expensive that is
understood by the optimizer. However, that asExpensive() could
just set the ScalaUDF.udfDeterministic = deterministic &&
!expensive, which implements the best available approach behind a
better naming.
>>
>> What are your thoughts on asExpensive()?
>>
>> Regards,
>> Enrico
>
>
---------------------------------------------------------------------
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
<mailto:dev-unsubscr...@spark.apache.org>
>
---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
<mailto:dev-unsubscr...@spark.apache.org>