Hi Alex,
You can cast the initial value to the desired type
val mergeExpr = expr("aggregate(data, cast(map() as mapstring>), (acc, i) -> map_concat(acc, i))")
On 8/27/22 13:06, Alexandros Biratsis wrote:
Hello folks,
I would like to ask Spark devs if and it possible to define explicitly
the key/value types for a map (Spark 3.3.0) as shown below:
|import org.apache.spark.sql.functions.{expr, collect_list} val df =
Seq( (1, Map("k1" -> "v1", "k2" -> "v3")), (1, Map("k3" -> "v3")),
(2, Map("k4" -> "v4")), (2, Map("k6" -> "v6", "k5" -> "v5"))
).toDF("id", "data") val mergeExpr = expr("aggregate(data, map(),
(acc, i) -> map_concat(acc, i))")
df.groupBy("id").agg(collect_list("data").as("data")) .select($"id",
mergeExpr.as("merged_data")) .show(false)|
The above code throws the next error:
AnalysisException: cannot resolve 'aggregate(`data`, map(),
lambdafunction(map_concat(namedlambdavariable(),
namedlambdavariable()), namedlambdavariable(),
namedlambdavariable()), lambdafunction(namedlambdavariable(),
namedlambdavariable()))' due to data type mismatch: argument 3
requires map type, however,
'lambdafunction(map_concat(namedlambdavariable(),
namedlambdavariable()), namedlambdavariable(),
namedlambdavariable())' is of map type.; Project
[id#110, aggregate(data#119, map(),
lambdafunction(map_concat(cast(lambda acc#122 as
map), lambda i#123), lambda acc#122, lambda i#123,
false), lambdafunction(lambda id#124, lambda id#124, false)) AS
aggregate(data, map(),
lambdafunction(map_concat(namedlambdavariable(),
namedlambdavariable()), namedlambdavariable(),
namedlambdavariable()), lambdafunction(namedlambdavariable(),
namedlambdavariable()))#125] +- Aggregate [id#110], [id#110,
collect_list(data#111, 0, 0) AS data#119] +- Project [_1#105 AS
id#110, _2#106 AS data#111] +- LocalRelation [_1#105, _2#106]
It seems that map() is initialised as map when
map is expected. I believe that the behaviour has changed
since 2.4.5 where map was initialised as map, and the
previous example was working.
Is it possible to create a map by specifying the key-value type explicitly?
So far, I came up with a workaround using map('', '') to initialise the
map for string key-value and using map_filter() to exclude/remove the
redundant map('', '') key-value item:
val mergeExpr = expr("map_filter(aggregate(data, map('', ''), (acc,
i) -> map_concat(acc, i)), (k, v) -> k != '')")
Thank you for your help
Greetings,
Alex
--
Best regards,
Maciej Szymkiewicz
Web: https://zero323.net
PGP: A30CEF0C31A501EC
OpenPGP_signature
Description: OpenPGP digital signature