I agree that 'non-deterministic' is the right term for what it currently does: mark an expression as non-deterministic (returns different values for the same input, e.g. rand()), and the optimizer does its best to not break semantics when moving expressions around.

In case of expensive deterministic UDFs, or any expensive deterministic expression, the optimizer should not multiply effort. Even in case of cheap expressions like a * 5, where performance impact is comparably small, it simply should not execute things twice. So this is not about expensive deterministic expressions but deterministic expressions that get referenced multiple times.

Pushing those expressions into other expressions that reference them is useful in order to simplify those other expressions, e.g. df.withColumn("b", not($"a")).where(not($"b")) will eliminate the double negation of a.

So if expressions are referenced multiple times, they should not be collapsed, unless referencing expressions get simplified. And then the simplification must pay off for evaluating the referenced expression twice. This needs some kind of cost-model, or at least heuristics.

In case of UDFs, I think they should never be collapsed because they cannot be used to simplify other expressions (can they?). They should rather be materialised as close to the first reference as possible. If executing the UDF and referencing it multiple times happens in the same stage, hence the same generated code, we end up with the perfect situation where that materialisation of the result per call is hold in memory and processed by all referencing expressions.

Marking UDFs as expensive is not the right approach here, I agree, they should simply not be executed multiple times.

Enrico


Am 08.11.19 um 04:26 schrieb Wenchen Fan:
We really need some documents to define what non-deterministic means. AFAIK, non-deterministic expressions may produce a different result for the same input row, if the already processed input rows are different.

The optimizer tries its best to not change the input sequence of non-deterministic expressions. For example, `df.select(..., nonDeterministicExpr).filter...` can't do filter pushdown. An exception is filter condition. For `df.filter(nonDeterministic && cond)`, Spark still pushes down `cond` even if it may change the input sequence of the first condition. This is to respect the SQL semantic that filter conditions ANDed together are order-insensitive. Users should write `df.filter(nonDeterministic).filter(cond)` to guarantee the order.

For this particular problem, I think it's not only about UDF, but a general problem with how Spark collapses Projects. For example, `df.select('a * 5 as 'b).select('b + 2, 'b + 3)`,  Spark optimizes it to `df.select('a * 5 + 2, 'a * 5 + 3)`, and execute 'a * 5 twice.

I think we should revisit this optimization and think about when we can collapse.

On Thu, Nov 7, 2019 at 6:20 PM Rubén Berenguel <rbereng...@gmail.com <mailto:rbereng...@gmail.com>> wrote:

    That was very interesting, thanks Enrico.

    Sean, IIRC it also prevents push down of the UDF in Catalyst in
    some cases.

    Regards,

    Ruben

    > On 7 Nov 2019, at 11:09, Sean Owen <sro...@gmail.com
    <mailto:sro...@gmail.com>> wrote:
    >
    > Interesting, what does non-deterministic do except have this effect?
    > aside from the naming, it could be a fine use of this flag if that's
    > all it effectively does. I'm not sure I'd introduce another flag
    with
    > the same semantics just over naming. If anything 'expensive' also
    > isn't the right word, more like 'try not to evaluate multiple
    times'.
    >
    > Why isn't caching the answer? I realize it's big, but you can
    cache to
    > disk. This may be faster than whatever plan reordering has to happen
    > to evaluate once.
    >
    > Usually I'd say, can you redesign your UDF and code to be more
    > efficient too? or use a big a cluster if that's really what you
    need.
    >
    > At first look, no I don't think this Spark-side workaround for
    naming
    > for your use case is worthwhile. There are existing better
    solutions.
    >
    > On Thu, Nov 7, 2019 at 2:45 AM Enrico Minack
    <m...@enrico.minack.dev <mailto:m...@enrico.minack.dev>> wrote:
    >>
    >> Hi all,
    >>
    >> Running expensive deterministic UDFs that return complex types,
    followed by multiple references to those results cause Spark to
    evaluate the UDF multiple times per row. This has been reported
    and discussed before: SPARK-18748 SPARK-17728
    >>
    >>    val f: Int => Array[Int]
    >>    val udfF = udf(f)
    >>    df
    >>      .select($"id", udfF($"id").as("array"))
    >>      .select($"array"(0).as("array0"), $"array"(1).as("array1"))
    >>
    >> A common approach to make Spark evaluate the UDF only once is
    to cache the intermediate result right after projecting the UDF:
    >>
    >>    df
    >>      .select($"id", udfF($"id").as("array"))
    >>      .cache()
    >>      .select($"array"(0).as("array0"), $"array"(1).as("array1"))
    >>
    >> There are scenarios where this intermediate result is too big
    for the cluster to cache. Also this is bad design.
    >>
    >> The best approach is to mark the UDF as non-deterministic. Then
    Spark optimizes the query in a way that the UDF gets called only
    once per row, exactly what you want.
    >>
    >>    val udfF = udf(f).asNondeterministic()
    >>
    >> However, stating a UDF is non-deterministic though it clearly
    is deterministic is counter-intuitive and makes your code harder
    to read.
    >>
    >> Spark should provide a better way to flag the UDF. Calling it
    expensive would be a better naming here.
    >>
    >>    val udfF = udf(f).asExpensive()
    >>
    >> I understand that deterministic is a notion that Expression
    provides, and there is no equivalent to expensive that is
    understood by the optimizer. However, that asExpensive() could
    just set the ScalaUDF.udfDeterministic = deterministic &&
    !expensive, which implements the best available approach behind a
    better naming.
    >>
    >> What are your thoughts on asExpensive()?
    >>
    >> Regards,
    >> Enrico
    >
    >
    ---------------------------------------------------------------------
    > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
    <mailto:dev-unsubscr...@spark.apache.org>
    >


    ---------------------------------------------------------------------
    To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
    <mailto:dev-unsubscr...@spark.apache.org>


Reply via email to