In particular this is sounding like: https://issues.apache.org/jira/browse/SPARK-10859
On Fri, Nov 6, 2015 at 1:05 PM, Michael Armbrust <mich...@databricks.com> wrote: > I would be great if you could try sql("SET > spark.sql.inMemoryColumnarStorage.partitionPruning=false") also, try Spark > 1.5.2-RC2 > <http://people.apache.org/~pwendell/spark-releases/spark-1.5.2-rc2-bin/> > > On Fri, Nov 6, 2015 at 4:49 AM, Seongduk Cheon <s.c...@opt.ne.jp> wrote: > >> Hi Yanal! >> >> Yes, exactly. I read from csv file and convert to DF with column names. >> >> simply look like this. >> >> val eventDF = sc.textFile(eventFile).map(_.split(",")).filter(_.size >= 6) >> .map { e => .... // To do sometings >> }.toDF(eventTableColumns:_*).cache() >> >> >> The result of <=> function is >> >> scala> eventDF.filter($"entityType" <=> >> "user").select("entityId").distinct.count >> res25: Long = 2091 >> >> As you mentioned, It seems related to nullable column. >> Using case class works as expected. It is one of the best workaround so >> far. >> >> >> >> 2015-11-06 19:01 GMT+09:00 Yanal Wazaefi <yanal.waza...@kelkoo.com>: >> >>> Hi Sondoku, >>> >>> Are you converting an event RDD using toDF("id", "event", "entityType", >>> "entityId", "targetEntityType", "targetEntityId", "properties") >>> function to get your eventDF ? >>> >>> Does <=> give you the correct result too (2091) ? >>> eventDF.filter($"entityType" <=> >>> "user").select("entityId").distinct.count >>> >>> I had the same problem with the DataFrame equality, using toDF("col1", >>> "col2", ...) function. >>> >>> To resolve this problem (bug?), I used a *case class* and then I >>> applied toDF() function. >>> Something like that in your case: >>> >>> case class Event(id: String, event: String, entityType: String, entityId: >>> String, targetEntityType: String, targetEntityId: String, properties: >>> String) >>> eventRDD.map{case (id, event, entityType, entityId, targetEntityType, >>> targetEntityId, >>> properties) => >>> Event(id, event, entityType, entityId, targetEntityType, targetEntityId, >>> properties) }.toDF() >>> >>> The comparison === should work in this case. >>> >>> The problem (I think) comes from some null values in the columns that >>> are before the column user (e.g. id or event). >>> >>> Yanal >>> >>> Subject: Re: DataFrame equality does not working in 1.5.1 Date: Fri, 6 >>> Nov 2015 02:14:18 +0100 From: Seongduk Cheon <s.c...@opt.ne.jp> >>> <s.c...@opt.ne.jp> To: Yin Huai <yh...@databricks.com> >>> <yh...@databricks.com> CC: user <user@spark.apache.org> >>> <user@spark.apache.org> >>> >>> >>> Hi, Yin >>> >>> Thanks for your time. This is the result. >>> >>> ------------------ >>> scala> eventDF.filter($"entityType" === >>> "user").select("entityId").distinct.explain(true) >>> == Parsed Logical Plan == >>> Aggregate [entityId#16], [entityId#16] >>> Project [entityId#16] >>> Filter (entityType#15 = user) >>> Project [_1#0 AS id#13,_2#1 AS event#14,_3#2 AS entityType#15,_4#3 AS >>> entityId#16,_5#4 AS targetEntityType#17,_6#5 AS targetEntityId#18,_7#6 AS >>> properties#19,_8#7 AS eventTime#20,_9#8 AS eventTimeZone#21,_10#9 AS >>> tags#22,_11#10 AS prId#23,_12#11 AS creationTime#24,_13#12 AS >>> creationTimeZone#25] >>> LogicalRDD >>> [_1#0,_2#1,_3#2,_4#3,_5#4,_6#5,_7#6,_8#7,_9#8,_10#9,_11#10,_12#11,_13#12], >>> MapPartitionsRDD[6] at rddToDataFrameHolder at <console>:61 >>> >>> == Analyzed Logical Plan == >>> entityId: string >>> Aggregate [entityId#16], [entityId#16] >>> Project [entityId#16] >>> Filter (entityType#15 = user) >>> Project [_1#0 AS id#13,_2#1 AS event#14,_3#2 AS entityType#15,_4#3 AS >>> entityId#16,_5#4 AS targetEntityType#17,_6#5 AS targetEntityId#18,_7#6 AS >>> properties#19,_8#7 AS eventTime#20,_9#8 AS eventTimeZone#21,_10#9 AS >>> tags#22,_11#10 AS prId#23,_12#11 AS creationTime#24,_13#12 AS >>> creationTimeZone#25] >>> LogicalRDD >>> [_1#0,_2#1,_3#2,_4#3,_5#4,_6#5,_7#6,_8#7,_9#8,_10#9,_11#10,_12#11,_13#12], >>> MapPartitionsRDD[6] at rddToDataFrameHolder at <console>:61 >>> >>> == Optimized Logical Plan == >>> Aggregate [entityId#16], [entityId#16] >>> Project [entityId#16] >>> Filter (entityType#15 = user) >>> InMemoryRelation >>> [id#13,event#14,entityType#15,entityId#16,targetEntityType#17,targetEntityId#18,properties#19,eventTime#20,eventTimeZone#21,tags#22,prId#23,creationTime#24,creationTimeZone#25], >>> true, 10000, StorageLevel(true, true, false, true, 1), (TungstenProject >>> [_1#0 AS id#13,_2#1 AS event#14,_3#2 AS entityType#15,_4#3 AS >>> entityId#16,_5#4 AS targetEntityType#17,_6#5 AS targetEntityId#18,_7#6 AS >>> properties#19,_8#7 AS eventTime#20,_9#8 AS eventTimeZone#21,_10#9 AS >>> tags#22,_11#10 AS prId#23,_12#11 AS creationTime#24,_13#12 AS >>> creationTimeZone#25]), None >>> >>> == Physical Plan == >>> TungstenAggregate(key=[entityId#16], functions=[], output=[entityId#16]) >>> TungstenExchange hashpartitioning(entityId#16) >>> TungstenAggregate(key=[entityId#16], functions=[], >>> output=[entityId#16]) >>> Project [entityId#16] >>> Filter (entityType#15 = user) >>> InMemoryColumnarTableScan [entityId#16,entityType#15], >>> [(entityType#15 = user)], (InMemoryRelation >>> [id#13,event#14,entityType#15,entityId#16,targetEntityType#17,targetEntityId#18,properties#19,eventTime#20,eventTimeZone#21,tags#22,prId#23,creationTime#24,creationTimeZone#25], >>> true, 10000, StorageLevel(true, true, false, true, 1), (TungstenProject >>> [_1#0 AS id#13,_2#1 AS event#14,_3#2 AS entityType#15,_4#3 AS >>> entityId#16,_5#4 AS targetEntityType#17,_6#5 AS targetEntityId#18,_7#6 AS >>> properties#19,_8#7 AS eventTime#20,_9#8 AS eventTimeZone#21,_10#9 AS >>> tags#22,_11#10 AS prId#23,_12#11 AS creationTime#24,_13#12 AS >>> creationTimeZone#25]), None) >>> >>> Code Generation: true >>> >>> scala> >>> >>> >>> >>> 2015-11-06 5:27 GMT+09:00 Yin Huai <yh...@databricks.com>: >>> >>>> Can you attach the result of eventDF.filter($"entityType" === >>>> "user").select("entityId").distinct.explain(true)? >>>> >>>> Thanks, >>>> >>>> Yin >>>> >>>> On Thu, Nov 5, 2015 at 1:12 AM, 千成徳 <s.c...@opt.ne.jp> wrote: >>>> >>>>> Hi All, >>>>> >>>>> I have data frame like this. >>>>> >>>>> Equality expression is not working in 1.5.1 but, works as expected in >>>>> 1.4.0 >>>>> What is the difference? >>>>> >>>>> scala> eventDF.printSchema() >>>>> root >>>>> |-- id: string (nullable = true) >>>>> |-- event: string (nullable = true) >>>>> |-- entityType: string (nullable = true) >>>>> |-- entityId: string (nullable = true) >>>>> |-- targetEntityType: string (nullable = true) >>>>> |-- targetEntityId: string (nullable = true) >>>>> |-- properties: string (nullable = true) >>>>> >>>>> scala> >>>>> eventDF.groupBy("entityType").agg(countDistinct("entityId")).show >>>>> +----------+------------------------+ >>>>> |entityType|COUNT(DISTINCT entityId)| >>>>> +----------+------------------------+ >>>>> | ib_user| 4751| >>>>> | user| 2091| >>>>> +----------+------------------------+ >>>>> >>>>> >>>>> ----- not works ( Bug ? ) >>>>> scala> eventDF.filter($"entityType" === >>>>> "user").select("entityId").distinct.count >>>>> res151: Long = 1219 >>>>> >>>>> scala> eventDF.filter(eventDF("entityType") === >>>>> "user").select("entityId").distinct.count >>>>> res153: Long = 1219 >>>>> >>>>> scala> eventDF.filter($"entityType" equalTo >>>>> "user").select("entityId").distinct.count >>>>> res149: Long = 1219 >>>>> >>>>> ----- works as expected >>>>> scala> eventDF.map{ e => (e.getAs[String]("entityId"), >>>>> e.getAs[String]("entityType")) }.filter(x => x._2 == >>>>> "user").map(_._1).distinct.count >>>>> res150: Long = 2091 >>>>> >>>>> scala> eventDF.filter($"entityType" in >>>>> "user").select("entityId").distinct.count >>>>> warning: there were 1 deprecation warning(s); re-run with -deprecation >>>>> for details >>>>> res155: Long = 2091 >>>>> >>>>> scala> eventDF.filter($"entityType" !== >>>>> "ib_user").select("entityId").distinct.count >>>>> res152: Long = 2091 >>>>> >>>>> >>>>> But, All of above code works in 1.4.0 >>>>> >>>>> Thanks. >>>>> >>>>> >>>> >>> >>> >>> -- >>> ------------------------------------------------------- >>> 千 成徳 (Sondoku Chon) >>> >>> 株 式 会社オプトホールディング http://www.opt.ne.jp/holding/ >>> データサイエンスラボ https://datasciencelab.jp/ >>> ビッ クデータアーキテクト >>> >>> 〒 102-0081 東京都千代田区四番町6東急番町ビル >>> Tel:080-4581-9708 >>> Fax:050-3156-7346 >>> ------------------------------------------------------- >>> >>> >>> >>> >>> ------------------------------ >>> Kelkoo SAS >>> Société par Actions Simplifiée >>> Au capital de € 4.168.964,30 >>> Siège social : 158 Ter Rue du Temple 75003 Paris >>> 425 093 069 RCS Paris >>> >>> Ce message et les pièces jointes sont confidentiels et établis à >>> l'attention exclusive de leurs destinataires. Si vous n'êtes pas le >>> destinataire de ce message, merci de le détruire et d'en avertir >>> l'expéditeur. >>> >> >> >> >> -- >> ------------------------------------------------------- >> 千 成徳 (Sondoku Chon) >> >> 株式会社オプトホールディング http://www.opt.ne.jp/holding/ >> データサイエンスラボ https://datasciencelab.jp/ >> ビックデータアーキテクト >> >> 〒102-0081 東京都千代田区四番町6東急番町ビル >> Tel:080-4581-9708 >> Fax:050-3156-7346 >> ------------------------------------------------------- >> > >