Hi Yanal!

Yes, exactly. I read from csv file and convert to DF with column names.

simply look like this.

val eventDF = sc.textFile(eventFile).map(_.split(",")).filter(_.size >= 6)
  .map { e => .... // To do sometings
  }.toDF(eventTableColumns:_*).cache()


The result of <=> function is

scala> eventDF.filter($"entityType" <=>
"user").select("entityId").distinct.count
res25: Long = 2091

As you mentioned, It seems related to nullable column.
Using case class works as expected. It is one of the best workaround so far.



2015-11-06 19:01 GMT+09:00 Yanal Wazaefi <yanal.waza...@kelkoo.com>:

> Hi Sondoku,
>
> Are you converting an event RDD using toDF("id", "event", "entityType", 
> "entityId",
> "targetEntityType", "targetEntityId", "properties") function to get your
> eventDF ?
>
> Does <=> give you the correct result too (2091) ?
> eventDF.filter($"entityType" <=> "user").select("entityId").distinct.count
>
> I had the same problem with the DataFrame equality, using toDF("col1",
> "col2", ...) function.
>
> To resolve this problem (bug?), I used a *case class* and then I applied
> toDF() function.
> Something like that in your case:
>
> case class Event(id: String, event: String, entityType: String, entityId:
> String, targetEntityType: String, targetEntityId: String, properties:
> String)
> eventRDD.map{case (id, event, entityType, entityId, targetEntityType, 
> targetEntityId,
> properties) =>
> Event(id, event, entityType, entityId, targetEntityType, targetEntityId,
> properties) }.toDF()
>
> The comparison === should work in this case.
>
> The problem (I think) comes from some null values in the columns that are
> before the column user (e.g. id or event).
>
> Yanal
>
> Subject: Re: DataFrame equality does not working in 1.5.1 Date: Fri, 6
> Nov 2015 02:14:18 +0100 From: Seongduk Cheon <s.c...@opt.ne.jp>
> <s.c...@opt.ne.jp> To: Yin Huai <yh...@databricks.com>
> <yh...@databricks.com> CC: user <user@spark.apache.org>
> <user@spark.apache.org>
>
>
> Hi, Yin
>
> Thanks for your time. This is the result.
>
> ------------------
> scala> eventDF.filter($"entityType" ===
> "user").select("entityId").distinct.explain(true)
> == Parsed Logical Plan ==
> Aggregate [entityId#16], [entityId#16]
>  Project [entityId#16]
>   Filter (entityType#15 = user)
>    Project [_1#0 AS id#13,_2#1 AS event#14,_3#2 AS entityType#15,_4#3 AS
> entityId#16,_5#4 AS targetEntityType#17,_6#5 AS targetEntityId#18,_7#6 AS
> properties#19,_8#7 AS eventTime#20,_9#8 AS eventTimeZone#21,_10#9 AS
> tags#22,_11#10 AS prId#23,_12#11 AS creationTime#24,_13#12 AS
> creationTimeZone#25]
>     LogicalRDD
> [_1#0,_2#1,_3#2,_4#3,_5#4,_6#5,_7#6,_8#7,_9#8,_10#9,_11#10,_12#11,_13#12],
> MapPartitionsRDD[6] at rddToDataFrameHolder at <console>:61
>
> == Analyzed Logical Plan ==
> entityId: string
> Aggregate [entityId#16], [entityId#16]
>  Project [entityId#16]
>   Filter (entityType#15 = user)
>    Project [_1#0 AS id#13,_2#1 AS event#14,_3#2 AS entityType#15,_4#3 AS
> entityId#16,_5#4 AS targetEntityType#17,_6#5 AS targetEntityId#18,_7#6 AS
> properties#19,_8#7 AS eventTime#20,_9#8 AS eventTimeZone#21,_10#9 AS
> tags#22,_11#10 AS prId#23,_12#11 AS creationTime#24,_13#12 AS
> creationTimeZone#25]
>     LogicalRDD
> [_1#0,_2#1,_3#2,_4#3,_5#4,_6#5,_7#6,_8#7,_9#8,_10#9,_11#10,_12#11,_13#12],
> MapPartitionsRDD[6] at rddToDataFrameHolder at <console>:61
>
> == Optimized Logical Plan ==
> Aggregate [entityId#16], [entityId#16]
>  Project [entityId#16]
>   Filter (entityType#15 = user)
>    InMemoryRelation
> [id#13,event#14,entityType#15,entityId#16,targetEntityType#17,targetEntityId#18,properties#19,eventTime#20,eventTimeZone#21,tags#22,prId#23,creationTime#24,creationTimeZone#25],
> true, 10000, StorageLevel(true, true, false, true, 1), (TungstenProject
> [_1#0 AS id#13,_2#1 AS event#14,_3#2 AS entityType#15,_4#3 AS
> entityId#16,_5#4 AS targetEntityType#17,_6#5 AS targetEntityId#18,_7#6 AS
> properties#19,_8#7 AS eventTime#20,_9#8 AS eventTimeZone#21,_10#9 AS
> tags#22,_11#10 AS prId#23,_12#11 AS creationTime#24,_13#12 AS
> creationTimeZone#25]), None
>
> == Physical Plan ==
> TungstenAggregate(key=[entityId#16], functions=[], output=[entityId#16])
>  TungstenExchange hashpartitioning(entityId#16)
>   TungstenAggregate(key=[entityId#16], functions=[], output=[entityId#16])
>    Project [entityId#16]
>     Filter (entityType#15 = user)
>      InMemoryColumnarTableScan [entityId#16,entityType#15],
> [(entityType#15 = user)], (InMemoryRelation
> [id#13,event#14,entityType#15,entityId#16,targetEntityType#17,targetEntityId#18,properties#19,eventTime#20,eventTimeZone#21,tags#22,prId#23,creationTime#24,creationTimeZone#25],
> true, 10000, StorageLevel(true, true, false, true, 1), (TungstenProject
> [_1#0 AS id#13,_2#1 AS event#14,_3#2 AS entityType#15,_4#3 AS
> entityId#16,_5#4 AS targetEntityType#17,_6#5 AS targetEntityId#18,_7#6 AS
> properties#19,_8#7 AS eventTime#20,_9#8 AS eventTimeZone#21,_10#9 AS
> tags#22,_11#10 AS prId#23,_12#11 AS creationTime#24,_13#12 AS
> creationTimeZone#25]), None)
>
> Code Generation: true
>
> scala>
>
>
>
> 2015-11-06 5:27 GMT+09:00 Yin Huai <yh...@databricks.com>:
>
>> Can you attach the result of eventDF.filter($"entityType" ===
>> "user").select("entityId").distinct.explain(true)?
>>
>> Thanks,
>>
>> Yin
>>
>> On Thu, Nov 5, 2015 at 1:12 AM, 千成徳 <s.c...@opt.ne.jp> wrote:
>>
>>> Hi All,
>>>
>>> I have data frame like this.
>>>
>>> Equality expression is not working in 1.5.1 but, works as expected in
>>> 1.4.0
>>> What is the difference?
>>>
>>> scala> eventDF.printSchema()
>>> root
>>>  |-- id: string (nullable = true)
>>>  |-- event: string (nullable = true)
>>>  |-- entityType: string (nullable = true)
>>>  |-- entityId: string (nullable = true)
>>>  |-- targetEntityType: string (nullable = true)
>>>  |-- targetEntityId: string (nullable = true)
>>>  |-- properties: string (nullable = true)
>>>
>>> scala> eventDF.groupBy("entityType").agg(countDistinct("entityId")).show
>>> +----------+------------------------+
>>> |entityType|COUNT(DISTINCT entityId)|
>>> +----------+------------------------+
>>> |   ib_user|                    4751|
>>> |      user|                    2091|
>>> +----------+------------------------+
>>>
>>>
>>> ----- not works ( Bug ? )
>>> scala> eventDF.filter($"entityType" ===
>>> "user").select("entityId").distinct.count
>>> res151: Long = 1219
>>>
>>> scala> eventDF.filter(eventDF("entityType") ===
>>> "user").select("entityId").distinct.count
>>> res153: Long = 1219
>>>
>>> scala> eventDF.filter($"entityType" equalTo
>>> "user").select("entityId").distinct.count
>>> res149: Long = 1219
>>>
>>> ----- works as expected
>>> scala> eventDF.map{ e => (e.getAs[String]("entityId"),
>>> e.getAs[String]("entityType")) }.filter(x => x._2 ==
>>> "user").map(_._1).distinct.count
>>> res150: Long = 2091
>>>
>>> scala> eventDF.filter($"entityType" in
>>> "user").select("entityId").distinct.count
>>> warning: there were 1 deprecation warning(s); re-run with -deprecation
>>> for details
>>> res155: Long = 2091
>>>
>>> scala> eventDF.filter($"entityType" !==
>>> "ib_user").select("entityId").distinct.count
>>> res152: Long = 2091
>>>
>>>
>>> But, All of above code works in 1.4.0
>>>
>>> Thanks.
>>>
>>>
>>
>
>
> --
> -------------------------------------------------------
> 千 成徳 (Sondoku Chon)
>
> 株 式 会社オプトホールディング http://www.opt.ne.jp/holding/
> データサイエンスラボ https://datasciencelab.jp/
> ビッ クデータアーキテクト
>
> 〒 102-0081 東京都千代田区四番町6東急番町ビル
> Tel:080-4581-9708
> Fax:050-3156-7346
> -------------------------------------------------------
>
>
>
>
> ------------------------------
> Kelkoo SAS
> Société par Actions Simplifiée
> Au capital de € 4.168.964,30
> Siège social : 158 Ter Rue du Temple 75003 Paris
> 425 093 069 RCS Paris
>
> Ce message et les pièces jointes sont confidentiels et établis à
> l'attention exclusive de leurs destinataires. Si vous n'êtes pas le
> destinataire de ce message, merci de le détruire et d'en avertir
> l'expéditeur.
>



-- 
-------------------------------------------------------
千 成徳 (Sondoku Chon)

株式会社オプトホールディング http://www.opt.ne.jp/holding/
データサイエンスラボ https://datasciencelab.jp/
ビックデータアーキテクト

〒102-0081 東京都千代田区四番町6東急番町ビル
Tel:080-4581-9708
Fax:050-3156-7346
-------------------------------------------------------

Reply via email to