Hi, Yin Thanks for your time. This is the result.
------------------ scala> eventDF.filter($"entityType" === "user").select("entityId").distinct.explain(true) == Parsed Logical Plan == Aggregate [entityId#16], [entityId#16] Project [entityId#16] Filter (entityType#15 = user) Project [_1#0 AS id#13,_2#1 AS event#14,_3#2 AS entityType#15,_4#3 AS entityId#16,_5#4 AS targetEntityType#17,_6#5 AS targetEntityId#18,_7#6 AS properties#19,_8#7 AS eventTime#20,_9#8 AS eventTimeZone#21,_10#9 AS tags#22,_11#10 AS prId#23,_12#11 AS creationTime#24,_13#12 AS creationTimeZone#25] LogicalRDD [_1#0,_2#1,_3#2,_4#3,_5#4,_6#5,_7#6,_8#7,_9#8,_10#9,_11#10,_12#11,_13#12], MapPartitionsRDD[6] at rddToDataFrameHolder at <console>:61 == Analyzed Logical Plan == entityId: string Aggregate [entityId#16], [entityId#16] Project [entityId#16] Filter (entityType#15 = user) Project [_1#0 AS id#13,_2#1 AS event#14,_3#2 AS entityType#15,_4#3 AS entityId#16,_5#4 AS targetEntityType#17,_6#5 AS targetEntityId#18,_7#6 AS properties#19,_8#7 AS eventTime#20,_9#8 AS eventTimeZone#21,_10#9 AS tags#22,_11#10 AS prId#23,_12#11 AS creationTime#24,_13#12 AS creationTimeZone#25] LogicalRDD [_1#0,_2#1,_3#2,_4#3,_5#4,_6#5,_7#6,_8#7,_9#8,_10#9,_11#10,_12#11,_13#12], MapPartitionsRDD[6] at rddToDataFrameHolder at <console>:61 == Optimized Logical Plan == Aggregate [entityId#16], [entityId#16] Project [entityId#16] Filter (entityType#15 = user) InMemoryRelation [id#13,event#14,entityType#15,entityId#16,targetEntityType#17,targetEntityId#18,properties#19,eventTime#20,eventTimeZone#21,tags#22,prId#23,creationTime#24,creationTimeZone#25], true, 10000, StorageLevel(true, true, false, true, 1), (TungstenProject [_1#0 AS id#13,_2#1 AS event#14,_3#2 AS entityType#15,_4#3 AS entityId#16,_5#4 AS targetEntityType#17,_6#5 AS targetEntityId#18,_7#6 AS properties#19,_8#7 AS eventTime#20,_9#8 AS eventTimeZone#21,_10#9 AS tags#22,_11#10 AS prId#23,_12#11 AS creationTime#24,_13#12 AS creationTimeZone#25]), None == Physical Plan == TungstenAggregate(key=[entityId#16], functions=[], output=[entityId#16]) TungstenExchange hashpartitioning(entityId#16) TungstenAggregate(key=[entityId#16], functions=[], output=[entityId#16]) Project [entityId#16] Filter (entityType#15 = user) InMemoryColumnarTableScan [entityId#16,entityType#15], [(entityType#15 = user)], (InMemoryRelation [id#13,event#14,entityType#15,entityId#16,targetEntityType#17,targetEntityId#18,properties#19,eventTime#20,eventTimeZone#21,tags#22,prId#23,creationTime#24,creationTimeZone#25], true, 10000, StorageLevel(true, true, false, true, 1), (TungstenProject [_1#0 AS id#13,_2#1 AS event#14,_3#2 AS entityType#15,_4#3 AS entityId#16,_5#4 AS targetEntityType#17,_6#5 AS targetEntityId#18,_7#6 AS properties#19,_8#7 AS eventTime#20,_9#8 AS eventTimeZone#21,_10#9 AS tags#22,_11#10 AS prId#23,_12#11 AS creationTime#24,_13#12 AS creationTimeZone#25]), None) Code Generation: true scala> 2015-11-06 5:27 GMT+09:00 Yin Huai <yh...@databricks.com>: > Can you attach the result of eventDF.filter($"entityType" === > "user").select("entityId").distinct.explain(true)? > > Thanks, > > Yin > > On Thu, Nov 5, 2015 at 1:12 AM, 千成徳 <s.c...@opt.ne.jp> wrote: > >> Hi All, >> >> I have data frame like this. >> >> Equality expression is not working in 1.5.1 but, works as expected in >> 1.4.0 >> What is the difference? >> >> scala> eventDF.printSchema() >> root >> |-- id: string (nullable = true) >> |-- event: string (nullable = true) >> |-- entityType: string (nullable = true) >> |-- entityId: string (nullable = true) >> |-- targetEntityType: string (nullable = true) >> |-- targetEntityId: string (nullable = true) >> |-- properties: string (nullable = true) >> >> scala> eventDF.groupBy("entityType").agg(countDistinct("entityId")).show >> +----------+------------------------+ >> |entityType|COUNT(DISTINCT entityId)| >> +----------+------------------------+ >> | ib_user| 4751| >> | user| 2091| >> +----------+------------------------+ >> >> >> ----- not works ( Bug ? ) >> scala> eventDF.filter($"entityType" === >> "user").select("entityId").distinct.count >> res151: Long = 1219 >> >> scala> eventDF.filter(eventDF("entityType") === >> "user").select("entityId").distinct.count >> res153: Long = 1219 >> >> scala> eventDF.filter($"entityType" equalTo >> "user").select("entityId").distinct.count >> res149: Long = 1219 >> >> ----- works as expected >> scala> eventDF.map{ e => (e.getAs[String]("entityId"), >> e.getAs[String]("entityType")) }.filter(x => x._2 == >> "user").map(_._1).distinct.count >> res150: Long = 2091 >> >> scala> eventDF.filter($"entityType" in >> "user").select("entityId").distinct.count >> warning: there were 1 deprecation warning(s); re-run with -deprecation >> for details >> res155: Long = 2091 >> >> scala> eventDF.filter($"entityType" !== >> "ib_user").select("entityId").distinct.count >> res152: Long = 2091 >> >> >> But, All of above code works in 1.4.0 >> >> Thanks. >> >> > -- ------------------------------------------------------- 千 成徳 (Sondoku Chon) 株式会社オプトホールディング http://www.opt.ne.jp/holding/ データサイエンスラボ https://datasciencelab.jp/ ビックデータアーキテクト 〒102-0081 東京都千代田区四番町6東急番町ビル Tel:080-4581-9708 Fax:050-3156-7346 -------------------------------------------------------