Hi all,
Running expensive deterministic UDFs that return complex types, followed
by multiple references to those results cause Spark to evaluate the UDF
multiple times per row. This has been reported and discussed before:
SPARK-18748 SPARK-17728
val f: Int => Array[Int]
val udfF = udf(f)
df
.select($"id", udfF($"id").as("array"))
.select($"array"(0).as("array0"), $"array"(1).as("array1"))
A common approach to make Spark evaluate the UDF only once is to cache
the intermediate result right after projecting the UDF:
df
.select($"id", udfF($"id").as("array"))
.cache()
.select($"array"(0).as("array0"), $"array"(1).as("array1"))
There are scenarios where this intermediate result is too big for the
cluster to cache. Also this is bad design.
The best approach is to mark the UDF as non-deterministic. Then Spark
optimizes the query in a way that the UDF gets called only once per row,
exactly what you want.
val udfF = udf(f).asNondeterministic()
*However, stating a UDF is non-deterministic though it clearly is
deterministic is counter-intuitive and makes your code harder to read.*
Spark should provide a better way to flag the UDF. Calling it expensive
would be a better naming here.
val udfF = udf(f).asExpensive()
I understand that deterministic is a notion that Expression provides,
and there is no equivalent to expensive that is understood by the
optimizer. However, that asExpensive() could just set the
ScalaUDF.udfDeterministic =deterministic &&!expensive, which implements
the best available approach behind a better naming.
What are your thoughts on asExpensive()?
Regards,
Enrico