Hi,
I have a RDD built during a spark streaming job and I'd like to join it to a
DataFrame (E/S input) to enrich it.
It seems that I can't join the RDD and the DF without converting first the RDD
to a DF (Tell me if I'm wrong). Here are the schemas of both DF :
scala> df
res32: org.apache.spark.sql.DataFrame = [f1: string, addresses:
array<struct<sf1:int,sf2:string,sf3:string,id:string>>, id: string]
scala> df_input
res33: org.apache.spark.sql.DataFrame = [id: string]
scala> df_input.collect
res34: Array[org.apache.spark.sql.Row] = Array([idaddress2], [idaddress12])
I can get ids I want if I know the value to look for in addresses.id using :
scala> df.filter(array_contains(df("addresses.id"),
"idaddress2")).select("id").collect
res35: Array[org.apache.spark.sql.Row] = Array([XXXX], [YY])
However when I try to join df_input and df and to use the previous filter as
the join condition I get an exception :
scala> df.join(df_input, array_contains(df("adresses.id"), df_input("id")))
java.lang.RuntimeException: Unsupported literal type class
org.apache.spark.sql.Column id
at
org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:50)
at org.apache.spark.sql.functions$.array_contains(functions.scala:2452)
...
It seems that array_contains only supports static arguments and does not
replace a sql.Column by its value.
What's the best way to achieve what I want to do ? (Also speaking in term of
performance)
Thanks
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]