Nobody has the answer ?
Another thing I've seen is that if I have no documents at all :
scala> df.select(explode(df("addresses.id")).as("aid")).collect
res27: Array[org.apache.spark.sql.Row] = Array()
Then
scala> df.select(explode(df("addresses.id")).as("aid"), df("id"))
org.apache.spark.sql.AnalysisException: Cannot resolve column name "id" among
(adresses);
at
org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:152)
at
org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:152)
Is there a better way to query nested objects and to join between a DF
containing nested objects and another regular data frame (yes it's the current
case)
> On May 9, 2016, at 00:42, Cyril Scetbon <[email protected]> wrote:
>
> Hi Ashish,
>
> The issue is not related to converting a RDD to a DF. I did it. I was just
> asking if I should do it differently.
>
> The issue regards the exception when using array_contains with a sql.Column
> instead of a value.
>
> I found another way to do it using explode as follows :
>
> df.select(explode(df("addresses.id")).as("aid"), df("id")).join(df_input,
> $"aid" === df_input("id")).select(df("id"))
>
> However, I'm wondering if it does almost the same or if the query is
> different and worst in term of performance.
>
> If someone can comment on it and maybe give me advices.
>
> Thank you.
>
>> On May 8, 2016, at 22:12, Ashish Dubey <[email protected]
>> <mailto:[email protected]>> wrote:
>>
>> Is there any reason you dont want to convert this - i dont think join b/w
>> RDD and DF is supported.
>>
>> On Sat, May 7, 2016 at 11:41 PM, Cyril Scetbon <[email protected]
>> <mailto:[email protected]>> wrote:
>> Hi,
>>
>> I have a RDD built during a spark streaming job and I'd like to join it to a
>> DataFrame (E/S input) to enrich it.
>> It seems that I can't join the RDD and the DF without converting first the
>> RDD to a DF (Tell me if I'm wrong). Here are the schemas of both DF :
>>
>> scala> df
>> res32: org.apache.spark.sql.DataFrame = [f1: string, addresses:
>> array<struct<sf1:int,sf2:string,sf3:string,id:string>>, id: string]
>>
>> scala> df_input
>> res33: org.apache.spark.sql.DataFrame = [id: string]
>>
>> scala> df_input.collect
>> res34: Array[org.apache.spark.sql.Row] = Array([idaddress2], [idaddress12])
>>
>> I can get ids I want if I know the value to look for in addresses.id
>> <http://addresses.id/> using :
>>
>> scala> df.filter(array_contains(df("addresses.id <http://addresses.id/>"),
>> "idaddress2")).select("id").collect
>> res35: Array[org.apache.spark.sql.Row] = Array([XXXX], [YY])
>>
>> However when I try to join df_input and df and to use the previous filter as
>> the join condition I get an exception :
>>
>> scala> df.join(df_input, array_contains(df("adresses.id
>> <http://adresses.id/>"), df_input("id")))
>> java.lang.RuntimeException: Unsupported literal type class
>> org.apache.spark.sql.Column id
>> at
>> org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:50)
>> at
>> org.apache.spark.sql.functions$.array_contains(functions.scala:2452)
>> ...
>>
>> It seems that array_contains only supports static arguments and does not
>> replace a sql.Column by its value.
>>
>> What's the best way to achieve what I want to do ? (Also speaking in term of
>> performance)
>>
>> Thanks
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: [email protected]
>> <mailto:[email protected]>
>> For additional commands, e-mail: [email protected]
>> <mailto:[email protected]>
>>
>>
>