Hi Yanal! Yes, exactly. I read from csv file and convert to DF with column names.
simply look like this. val eventDF = sc.textFile(eventFile).map(_.split(",")).filter(_.size >= 6) .map { e => .... // To do sometings }.toDF(eventTableColumns:_*).cache() The result of <=> function is scala> eventDF.filter($"entityType" <=> "user").select("entityId").distinct.count res25: Long = 2091 As you mentioned, It seems related to nullable column. Using case class works as expected. It is one of the best workaround so far. 2015-11-06 19:01 GMT+09:00 Yanal Wazaefi <yanal.waza...@kelkoo.com>: > Hi Sondoku, > > Are you converting an event RDD using toDF("id", "event", "entityType", > "entityId", > "targetEntityType", "targetEntityId", "properties") function to get your > eventDF ? > > Does <=> give you the correct result too (2091) ? > eventDF.filter($"entityType" <=> "user").select("entityId").distinct.count > > I had the same problem with the DataFrame equality, using toDF("col1", > "col2", ...) function. > > To resolve this problem (bug?), I used a *case class* and then I applied > toDF() function. > Something like that in your case: > > case class Event(id: String, event: String, entityType: String, entityId: > String, targetEntityType: String, targetEntityId: String, properties: > String) > eventRDD.map{case (id, event, entityType, entityId, targetEntityType, > targetEntityId, > properties) => > Event(id, event, entityType, entityId, targetEntityType, targetEntityId, > properties) }.toDF() > > The comparison === should work in this case. > > The problem (I think) comes from some null values in the columns that are > before the column user (e.g. id or event). > > Yanal > > Subject: Re: DataFrame equality does not working in 1.5.1 Date: Fri, 6 > Nov 2015 02:14:18 +0100 From: Seongduk Cheon <s.c...@opt.ne.jp> > <s.c...@opt.ne.jp> To: Yin Huai <yh...@databricks.com> > <yh...@databricks.com> CC: user <user@spark.apache.org> > <user@spark.apache.org> > > > Hi, Yin > > Thanks for your time. This is the result. > > ------------------ > scala> eventDF.filter($"entityType" === > "user").select("entityId").distinct.explain(true) > == Parsed Logical Plan == > Aggregate [entityId#16], [entityId#16] > Project [entityId#16] > Filter (entityType#15 = user) > Project [_1#0 AS id#13,_2#1 AS event#14,_3#2 AS entityType#15,_4#3 AS > entityId#16,_5#4 AS targetEntityType#17,_6#5 AS targetEntityId#18,_7#6 AS > properties#19,_8#7 AS eventTime#20,_9#8 AS eventTimeZone#21,_10#9 AS > tags#22,_11#10 AS prId#23,_12#11 AS creationTime#24,_13#12 AS > creationTimeZone#25] > LogicalRDD > [_1#0,_2#1,_3#2,_4#3,_5#4,_6#5,_7#6,_8#7,_9#8,_10#9,_11#10,_12#11,_13#12], > MapPartitionsRDD[6] at rddToDataFrameHolder at <console>:61 > > == Analyzed Logical Plan == > entityId: string > Aggregate [entityId#16], [entityId#16] > Project [entityId#16] > Filter (entityType#15 = user) > Project [_1#0 AS id#13,_2#1 AS event#14,_3#2 AS entityType#15,_4#3 AS > entityId#16,_5#4 AS targetEntityType#17,_6#5 AS targetEntityId#18,_7#6 AS > properties#19,_8#7 AS eventTime#20,_9#8 AS eventTimeZone#21,_10#9 AS > tags#22,_11#10 AS prId#23,_12#11 AS creationTime#24,_13#12 AS > creationTimeZone#25] > LogicalRDD > [_1#0,_2#1,_3#2,_4#3,_5#4,_6#5,_7#6,_8#7,_9#8,_10#9,_11#10,_12#11,_13#12], > MapPartitionsRDD[6] at rddToDataFrameHolder at <console>:61 > > == Optimized Logical Plan == > Aggregate [entityId#16], [entityId#16] > Project [entityId#16] > Filter (entityType#15 = user) > InMemoryRelation > [id#13,event#14,entityType#15,entityId#16,targetEntityType#17,targetEntityId#18,properties#19,eventTime#20,eventTimeZone#21,tags#22,prId#23,creationTime#24,creationTimeZone#25], > true, 10000, StorageLevel(true, true, false, true, 1), (TungstenProject > [_1#0 AS id#13,_2#1 AS event#14,_3#2 AS entityType#15,_4#3 AS > entityId#16,_5#4 AS targetEntityType#17,_6#5 AS targetEntityId#18,_7#6 AS > properties#19,_8#7 AS eventTime#20,_9#8 AS eventTimeZone#21,_10#9 AS > tags#22,_11#10 AS prId#23,_12#11 AS creationTime#24,_13#12 AS > creationTimeZone#25]), None > > == Physical Plan == > TungstenAggregate(key=[entityId#16], functions=[], output=[entityId#16]) > TungstenExchange hashpartitioning(entityId#16) > TungstenAggregate(key=[entityId#16], functions=[], output=[entityId#16]) > Project [entityId#16] > Filter (entityType#15 = user) > InMemoryColumnarTableScan [entityId#16,entityType#15], > [(entityType#15 = user)], (InMemoryRelation > [id#13,event#14,entityType#15,entityId#16,targetEntityType#17,targetEntityId#18,properties#19,eventTime#20,eventTimeZone#21,tags#22,prId#23,creationTime#24,creationTimeZone#25], > true, 10000, StorageLevel(true, true, false, true, 1), (TungstenProject > [_1#0 AS id#13,_2#1 AS event#14,_3#2 AS entityType#15,_4#3 AS > entityId#16,_5#4 AS targetEntityType#17,_6#5 AS targetEntityId#18,_7#6 AS > properties#19,_8#7 AS eventTime#20,_9#8 AS eventTimeZone#21,_10#9 AS > tags#22,_11#10 AS prId#23,_12#11 AS creationTime#24,_13#12 AS > creationTimeZone#25]), None) > > Code Generation: true > > scala> > > > > 2015-11-06 5:27 GMT+09:00 Yin Huai <yh...@databricks.com>: > >> Can you attach the result of eventDF.filter($"entityType" === >> "user").select("entityId").distinct.explain(true)? >> >> Thanks, >> >> Yin >> >> On Thu, Nov 5, 2015 at 1:12 AM, 千成徳 <s.c...@opt.ne.jp> wrote: >> >>> Hi All, >>> >>> I have data frame like this. >>> >>> Equality expression is not working in 1.5.1 but, works as expected in >>> 1.4.0 >>> What is the difference? >>> >>> scala> eventDF.printSchema() >>> root >>> |-- id: string (nullable = true) >>> |-- event: string (nullable = true) >>> |-- entityType: string (nullable = true) >>> |-- entityId: string (nullable = true) >>> |-- targetEntityType: string (nullable = true) >>> |-- targetEntityId: string (nullable = true) >>> |-- properties: string (nullable = true) >>> >>> scala> eventDF.groupBy("entityType").agg(countDistinct("entityId")).show >>> +----------+------------------------+ >>> |entityType|COUNT(DISTINCT entityId)| >>> +----------+------------------------+ >>> | ib_user| 4751| >>> | user| 2091| >>> +----------+------------------------+ >>> >>> >>> ----- not works ( Bug ? ) >>> scala> eventDF.filter($"entityType" === >>> "user").select("entityId").distinct.count >>> res151: Long = 1219 >>> >>> scala> eventDF.filter(eventDF("entityType") === >>> "user").select("entityId").distinct.count >>> res153: Long = 1219 >>> >>> scala> eventDF.filter($"entityType" equalTo >>> "user").select("entityId").distinct.count >>> res149: Long = 1219 >>> >>> ----- works as expected >>> scala> eventDF.map{ e => (e.getAs[String]("entityId"), >>> e.getAs[String]("entityType")) }.filter(x => x._2 == >>> "user").map(_._1).distinct.count >>> res150: Long = 2091 >>> >>> scala> eventDF.filter($"entityType" in >>> "user").select("entityId").distinct.count >>> warning: there were 1 deprecation warning(s); re-run with -deprecation >>> for details >>> res155: Long = 2091 >>> >>> scala> eventDF.filter($"entityType" !== >>> "ib_user").select("entityId").distinct.count >>> res152: Long = 2091 >>> >>> >>> But, All of above code works in 1.4.0 >>> >>> Thanks. >>> >>> >> > > > -- > ------------------------------------------------------- > 千 成徳 (Sondoku Chon) > > 株 式 会社オプトホールディング http://www.opt.ne.jp/holding/ > データサイエンスラボ https://datasciencelab.jp/ > ビッ クデータアーキテクト > > 〒 102-0081 東京都千代田区四番町6東急番町ビル > Tel:080-4581-9708 > Fax:050-3156-7346 > ------------------------------------------------------- > > > > > ------------------------------ > Kelkoo SAS > Société par Actions Simplifiée > Au capital de € 4.168.964,30 > Siège social : 158 Ter Rue du Temple 75003 Paris > 425 093 069 RCS Paris > > Ce message et les pièces jointes sont confidentiels et établis à > l'attention exclusive de leurs destinataires. Si vous n'êtes pas le > destinataire de ce message, merci de le détruire et d'en avertir > l'expéditeur. > -- ------------------------------------------------------- 千 成徳 (Sondoku Chon) 株式会社オプトホールディング http://www.opt.ne.jp/holding/ データサイエンスラボ https://datasciencelab.jp/ ビックデータアーキテクト 〒102-0081 東京都千代田区四番町6東急番町ビル Tel:080-4581-9708 Fax:050-3156-7346 -------------------------------------------------------