Hi, Michael It works find.
scala> sqlContext.sql("SET spark.sql.inMemoryColumnarStorage.partitionPruning=false") res28: org.apache.spark.sql.DataFrame = [key: string, value: string] scala> eventDF.filter($"entityType" === "user").select("entityId").distinct.count res29: Long = 2091 Thank you so much for helping me. 2015-11-07 6:13 GMT+09:00 Michael Armbrust <mich...@databricks.com>: > In particular this is sounding like: > https://issues.apache.org/jira/browse/SPARK-10859 > > On Fri, Nov 6, 2015 at 1:05 PM, Michael Armbrust <mich...@databricks.com> > wrote: > >> I would be great if you could try sql("SET >> spark.sql.inMemoryColumnarStorage.partitionPruning=false") also, try Spark >> 1.5.2-RC2 >> <http://people.apache.org/~pwendell/spark-releases/spark-1.5.2-rc2-bin/> >> >> On Fri, Nov 6, 2015 at 4:49 AM, Seongduk Cheon <s.c...@opt.ne.jp> wrote: >> >>> Hi Yanal! >>> >>> Yes, exactly. I read from csv file and convert to DF with column names. >>> >>> simply look like this. >>> >>> val eventDF = sc.textFile(eventFile).map(_.split(",")).filter(_.size >= 6) >>> .map { e => .... // To do sometings >>> }.toDF(eventTableColumns:_*).cache() >>> >>> >>> The result of <=> function is >>> >>> scala> eventDF.filter($"entityType" <=> >>> "user").select("entityId").distinct.count >>> res25: Long = 2091 >>> >>> As you mentioned, It seems related to nullable column. >>> Using case class works as expected. It is one of the best workaround so >>> far. >>> >>> >>> >>> 2015-11-06 19:01 GMT+09:00 Yanal Wazaefi <yanal.waza...@kelkoo.com>: >>> >>>> Hi Sondoku, >>>> >>>> Are you converting an event RDD using toDF("id", "event", "entityType", >>>> "entityId", "targetEntityType", "targetEntityId", "properties") >>>> function to get your eventDF ? >>>> >>>> Does <=> give you the correct result too (2091) ? >>>> eventDF.filter($"entityType" <=> >>>> "user").select("entityId").distinct.count >>>> >>>> I had the same problem with the DataFrame equality, using toDF("col1", >>>> "col2", ...) function. >>>> >>>> To resolve this problem (bug?), I used a *case class* and then I >>>> applied toDF() function. >>>> Something like that in your case: >>>> >>>> case class Event(id: String, event: String, entityType: String, >>>> entityId: String, targetEntityType: String, targetEntityId: String, >>>> properties: String) >>>> eventRDD.map{case (id, event, entityType, entityId, targetEntityType, >>>> targetEntityId, >>>> properties) => >>>> Event(id, event, entityType, entityId, targetEntityType, targetEntityId, >>>> properties) }.toDF() >>>> >>>> The comparison === should work in this case. >>>> >>>> The problem (I think) comes from some null values in the columns that >>>> are before the column user (e.g. id or event). >>>> >>>> Yanal >>>> >>>> Subject: Re: DataFrame equality does not working in 1.5.1 Date: Fri, 6 >>>> Nov 2015 02:14:18 +0100 From: Seongduk Cheon <s.c...@opt.ne.jp> >>>> <s.c...@opt.ne.jp> To: Yin Huai <yh...@databricks.com> >>>> <yh...@databricks.com> CC: user <user@spark.apache.org> >>>> <user@spark.apache.org> >>>> >>>> >>>> Hi, Yin >>>> >>>> Thanks for your time. This is the result. >>>> >>>> ------------------ >>>> scala> eventDF.filter($"entityType" === >>>> "user").select("entityId").distinct.explain(true) >>>> == Parsed Logical Plan == >>>> Aggregate [entityId#16], [entityId#16] >>>> Project [entityId#16] >>>> Filter (entityType#15 = user) >>>> Project [_1#0 AS id#13,_2#1 AS event#14,_3#2 AS entityType#15,_4#3 >>>> AS entityId#16,_5#4 AS targetEntityType#17,_6#5 AS targetEntityId#18,_7#6 >>>> AS properties#19,_8#7 AS eventTime#20,_9#8 AS eventTimeZone#21,_10#9 AS >>>> tags#22,_11#10 AS prId#23,_12#11 AS creationTime#24,_13#12 AS >>>> creationTimeZone#25] >>>> LogicalRDD >>>> [_1#0,_2#1,_3#2,_4#3,_5#4,_6#5,_7#6,_8#7,_9#8,_10#9,_11#10,_12#11,_13#12], >>>> MapPartitionsRDD[6] at rddToDataFrameHolder at <console>:61 >>>> >>>> == Analyzed Logical Plan == >>>> entityId: string >>>> Aggregate [entityId#16], [entityId#16] >>>> Project [entityId#16] >>>> Filter (entityType#15 = user) >>>> Project [_1#0 AS id#13,_2#1 AS event#14,_3#2 AS entityType#15,_4#3 >>>> AS entityId#16,_5#4 AS targetEntityType#17,_6#5 AS targetEntityId#18,_7#6 >>>> AS properties#19,_8#7 AS eventTime#20,_9#8 AS eventTimeZone#21,_10#9 AS >>>> tags#22,_11#10 AS prId#23,_12#11 AS creationTime#24,_13#12 AS >>>> creationTimeZone#25] >>>> LogicalRDD >>>> [_1#0,_2#1,_3#2,_4#3,_5#4,_6#5,_7#6,_8#7,_9#8,_10#9,_11#10,_12#11,_13#12], >>>> MapPartitionsRDD[6] at rddToDataFrameHolder at <console>:61 >>>> >>>> == Optimized Logical Plan == >>>> Aggregate [entityId#16], [entityId#16] >>>> Project [entityId#16] >>>> Filter (entityType#15 = user) >>>> InMemoryRelation >>>> [id#13,event#14,entityType#15,entityId#16,targetEntityType#17,targetEntityId#18,properties#19,eventTime#20,eventTimeZone#21,tags#22,prId#23,creationTime#24,creationTimeZone#25], >>>> true, 10000, StorageLevel(true, true, false, true, 1), (TungstenProject >>>> [_1#0 AS id#13,_2#1 AS event#14,_3#2 AS entityType#15,_4#3 AS >>>> entityId#16,_5#4 AS targetEntityType#17,_6#5 AS targetEntityId#18,_7#6 AS >>>> properties#19,_8#7 AS eventTime#20,_9#8 AS eventTimeZone#21,_10#9 AS >>>> tags#22,_11#10 AS prId#23,_12#11 AS creationTime#24,_13#12 AS >>>> creationTimeZone#25]), None >>>> >>>> == Physical Plan == >>>> TungstenAggregate(key=[entityId#16], functions=[], output=[entityId#16]) >>>> TungstenExchange hashpartitioning(entityId#16) >>>> TungstenAggregate(key=[entityId#16], functions=[], >>>> output=[entityId#16]) >>>> Project [entityId#16] >>>> Filter (entityType#15 = user) >>>> InMemoryColumnarTableScan [entityId#16,entityType#15], >>>> [(entityType#15 = user)], (InMemoryRelation >>>> [id#13,event#14,entityType#15,entityId#16,targetEntityType#17,targetEntityId#18,properties#19,eventTime#20,eventTimeZone#21,tags#22,prId#23,creationTime#24,creationTimeZone#25], >>>> true, 10000, StorageLevel(true, true, false, true, 1), (TungstenProject >>>> [_1#0 AS id#13,_2#1 AS event#14,_3#2 AS entityType#15,_4#3 AS >>>> entityId#16,_5#4 AS targetEntityType#17,_6#5 AS targetEntityId#18,_7#6 AS >>>> properties#19,_8#7 AS eventTime#20,_9#8 AS eventTimeZone#21,_10#9 AS >>>> tags#22,_11#10 AS prId#23,_12#11 AS creationTime#24,_13#12 AS >>>> creationTimeZone#25]), None) >>>> >>>> Code Generation: true >>>> >>>> scala> >>>> >>>> >>>> >>>> 2015-11-06 5:27 GMT+09:00 Yin Huai <yh...@databricks.com>: >>>> >>>>> Can you attach the result of eventDF.filter($"entityType" === >>>>> "user").select("entityId").distinct.explain(true)? >>>>> >>>>> Thanks, >>>>> >>>>> Yin >>>>> >>>>> On Thu, Nov 5, 2015 at 1:12 AM, 千成徳 <s.c...@opt.ne.jp> wrote: >>>>> >>>>>> Hi All, >>>>>> >>>>>> I have data frame like this. >>>>>> >>>>>> Equality expression is not working in 1.5.1 but, works as expected in >>>>>> 1.4.0 >>>>>> What is the difference? >>>>>> >>>>>> scala> eventDF.printSchema() >>>>>> root >>>>>> |-- id: string (nullable = true) >>>>>> |-- event: string (nullable = true) >>>>>> |-- entityType: string (nullable = true) >>>>>> |-- entityId: string (nullable = true) >>>>>> |-- targetEntityType: string (nullable = true) >>>>>> |-- targetEntityId: string (nullable = true) >>>>>> |-- properties: string (nullable = true) >>>>>> >>>>>> scala> >>>>>> eventDF.groupBy("entityType").agg(countDistinct("entityId")).show >>>>>> +----------+------------------------+ >>>>>> |entityType|COUNT(DISTINCT entityId)| >>>>>> +----------+------------------------+ >>>>>> | ib_user| 4751| >>>>>> | user| 2091| >>>>>> +----------+------------------------+ >>>>>> >>>>>> >>>>>> ----- not works ( Bug ? ) >>>>>> scala> eventDF.filter($"entityType" === >>>>>> "user").select("entityId").distinct.count >>>>>> res151: Long = 1219 >>>>>> >>>>>> scala> eventDF.filter(eventDF("entityType") === >>>>>> "user").select("entityId").distinct.count >>>>>> res153: Long = 1219 >>>>>> >>>>>> scala> eventDF.filter($"entityType" equalTo >>>>>> "user").select("entityId").distinct.count >>>>>> res149: Long = 1219 >>>>>> >>>>>> ----- works as expected >>>>>> scala> eventDF.map{ e => (e.getAs[String]("entityId"), >>>>>> e.getAs[String]("entityType")) }.filter(x => x._2 == >>>>>> "user").map(_._1).distinct.count >>>>>> res150: Long = 2091 >>>>>> >>>>>> scala> eventDF.filter($"entityType" in >>>>>> "user").select("entityId").distinct.count >>>>>> warning: there were 1 deprecation warning(s); re-run with >>>>>> -deprecation for details >>>>>> res155: Long = 2091 >>>>>> >>>>>> scala> eventDF.filter($"entityType" !== >>>>>> "ib_user").select("entityId").distinct.count >>>>>> res152: Long = 2091 >>>>>> >>>>>> >>>>>> But, All of above code works in 1.4.0 >>>>>> >>>>>> Thanks. >>>>>> >>>>>> >>>>> >>>> >>>> >>>> -- >>>> ------------------------------------------------------- >>>> 千 成徳 (Sondoku Chon) >>>> >>>> 株 式 会社オプトホールディング http://www.opt.ne.jp/holding/ >>>> データサイエンスラボ https://datasciencelab.jp/ >>>> ビッ クデータアーキテクト >>>> >>>> 〒 102-0081 東京都千代田区四番町6東急番町ビル >>>> Tel:080-4581-9708 >>>> Fax:050-3156-7346 >>>> ------------------------------------------------------- >>>> >>>> >>>> >>>> >>>> ------------------------------ >>>> Kelkoo SAS >>>> Société par Actions Simplifiée >>>> Au capital de € 4.168.964,30 >>>> Siège social : 158 Ter Rue du Temple 75003 Paris >>>> 425 093 069 RCS Paris >>>> >>>> Ce message et les pièces jointes sont confidentiels et établis à >>>> l'attention exclusive de leurs destinataires. Si vous n'êtes pas le >>>> destinataire de ce message, merci de le détruire et d'en avertir >>>> l'expéditeur. >>>> >>> >>> >>> >>> -- >>> ------------------------------------------------------- >>> 千 成徳 (Sondoku Chon) >>> >>> 株式会社オプトホールディング http://www.opt.ne.jp/holding/ >>> データサイエンスラボ https://datasciencelab.jp/ >>> ビックデータアーキテクト >>> >>> 〒102-0081 東京都千代田区四番町6東急番町ビル >>> Tel:080-4581-9708 >>> Fax:050-3156-7346 >>> ------------------------------------------------------- >>> >> >> > -- ------------------------------------------------------- 千 成徳 (Sondoku Chon) 株式会社オプトホールディング http://www.opt.ne.jp/holding/ Data Science Lab https://datasciencelab.jp/ Big Data Architect 〒102-0081 東京都千代田区四番町6東急番町ビル Tel:080-4581-9708 Fax:050-3156-7346 -------------------------------------------------------