Hi, Michael

It works find.

scala> sqlContext.sql("SET
spark.sql.inMemoryColumnarStorage.partitionPruning=false")
res28: org.apache.spark.sql.DataFrame = [key: string, value: string]

scala> eventDF.filter($"entityType" ===
"user").select("entityId").distinct.count
res29: Long = 2091

Thank you so much for helping me.

2015-11-07 6:13 GMT+09:00 Michael Armbrust <mich...@databricks.com>:

> In particular this is sounding like:
> https://issues.apache.org/jira/browse/SPARK-10859
>
> On Fri, Nov 6, 2015 at 1:05 PM, Michael Armbrust <mich...@databricks.com>
> wrote:
>
>> I would be great if you could try sql("SET
>> spark.sql.inMemoryColumnarStorage.partitionPruning=false") also, try Spark
>> 1.5.2-RC2
>> <http://people.apache.org/~pwendell/spark-releases/spark-1.5.2-rc2-bin/>
>>
>> On Fri, Nov 6, 2015 at 4:49 AM, Seongduk Cheon <s.c...@opt.ne.jp> wrote:
>>
>>> Hi Yanal!
>>>
>>> Yes, exactly. I read from csv file and convert to DF with column names.
>>>
>>> simply look like this.
>>>
>>> val eventDF = sc.textFile(eventFile).map(_.split(",")).filter(_.size >= 6)
>>>   .map { e => .... // To do sometings
>>>   }.toDF(eventTableColumns:_*).cache()
>>>
>>>
>>> The result of <=> function is
>>>
>>> scala> eventDF.filter($"entityType" <=>
>>> "user").select("entityId").distinct.count
>>> res25: Long = 2091
>>>
>>> As you mentioned, It seems related to nullable column.
>>> Using case class works as expected. It is one of the best workaround so
>>> far.
>>>
>>>
>>>
>>> 2015-11-06 19:01 GMT+09:00 Yanal Wazaefi <yanal.waza...@kelkoo.com>:
>>>
>>>> Hi Sondoku,
>>>>
>>>> Are you converting an event RDD using toDF("id", "event", "entityType",
>>>> "entityId", "targetEntityType", "targetEntityId", "properties")
>>>> function to get your eventDF ?
>>>>
>>>> Does <=> give you the correct result too (2091) ?
>>>> eventDF.filter($"entityType" <=>
>>>> "user").select("entityId").distinct.count
>>>>
>>>> I had the same problem with the DataFrame equality, using toDF("col1",
>>>> "col2", ...) function.
>>>>
>>>> To resolve this problem (bug?), I used a *case class* and then I
>>>> applied toDF() function.
>>>> Something like that in your case:
>>>>
>>>> case class Event(id: String, event: String, entityType: String,
>>>> entityId: String, targetEntityType: String, targetEntityId: String,
>>>> properties: String)
>>>> eventRDD.map{case (id, event, entityType, entityId, targetEntityType, 
>>>> targetEntityId,
>>>> properties) =>
>>>> Event(id, event, entityType, entityId, targetEntityType, targetEntityId,
>>>> properties) }.toDF()
>>>>
>>>> The comparison === should work in this case.
>>>>
>>>> The problem (I think) comes from some null values in the columns that
>>>> are before the column user (e.g. id or event).
>>>>
>>>> Yanal
>>>>
>>>> Subject: Re: DataFrame equality does not working in 1.5.1 Date: Fri, 6
>>>> Nov 2015 02:14:18 +0100 From: Seongduk Cheon <s.c...@opt.ne.jp>
>>>> <s.c...@opt.ne.jp> To: Yin Huai <yh...@databricks.com>
>>>> <yh...@databricks.com> CC: user <user@spark.apache.org>
>>>> <user@spark.apache.org>
>>>>
>>>>
>>>> Hi, Yin
>>>>
>>>> Thanks for your time. This is the result.
>>>>
>>>> ------------------
>>>> scala> eventDF.filter($"entityType" ===
>>>> "user").select("entityId").distinct.explain(true)
>>>> == Parsed Logical Plan ==
>>>> Aggregate [entityId#16], [entityId#16]
>>>>  Project [entityId#16]
>>>>   Filter (entityType#15 = user)
>>>>    Project [_1#0 AS id#13,_2#1 AS event#14,_3#2 AS entityType#15,_4#3
>>>> AS entityId#16,_5#4 AS targetEntityType#17,_6#5 AS targetEntityId#18,_7#6
>>>> AS properties#19,_8#7 AS eventTime#20,_9#8 AS eventTimeZone#21,_10#9 AS
>>>> tags#22,_11#10 AS prId#23,_12#11 AS creationTime#24,_13#12 AS
>>>> creationTimeZone#25]
>>>>     LogicalRDD
>>>> [_1#0,_2#1,_3#2,_4#3,_5#4,_6#5,_7#6,_8#7,_9#8,_10#9,_11#10,_12#11,_13#12],
>>>> MapPartitionsRDD[6] at rddToDataFrameHolder at <console>:61
>>>>
>>>> == Analyzed Logical Plan ==
>>>> entityId: string
>>>> Aggregate [entityId#16], [entityId#16]
>>>>  Project [entityId#16]
>>>>   Filter (entityType#15 = user)
>>>>    Project [_1#0 AS id#13,_2#1 AS event#14,_3#2 AS entityType#15,_4#3
>>>> AS entityId#16,_5#4 AS targetEntityType#17,_6#5 AS targetEntityId#18,_7#6
>>>> AS properties#19,_8#7 AS eventTime#20,_9#8 AS eventTimeZone#21,_10#9 AS
>>>> tags#22,_11#10 AS prId#23,_12#11 AS creationTime#24,_13#12 AS
>>>> creationTimeZone#25]
>>>>     LogicalRDD
>>>> [_1#0,_2#1,_3#2,_4#3,_5#4,_6#5,_7#6,_8#7,_9#8,_10#9,_11#10,_12#11,_13#12],
>>>> MapPartitionsRDD[6] at rddToDataFrameHolder at <console>:61
>>>>
>>>> == Optimized Logical Plan ==
>>>> Aggregate [entityId#16], [entityId#16]
>>>>  Project [entityId#16]
>>>>   Filter (entityType#15 = user)
>>>>    InMemoryRelation
>>>> [id#13,event#14,entityType#15,entityId#16,targetEntityType#17,targetEntityId#18,properties#19,eventTime#20,eventTimeZone#21,tags#22,prId#23,creationTime#24,creationTimeZone#25],
>>>> true, 10000, StorageLevel(true, true, false, true, 1), (TungstenProject
>>>> [_1#0 AS id#13,_2#1 AS event#14,_3#2 AS entityType#15,_4#3 AS
>>>> entityId#16,_5#4 AS targetEntityType#17,_6#5 AS targetEntityId#18,_7#6 AS
>>>> properties#19,_8#7 AS eventTime#20,_9#8 AS eventTimeZone#21,_10#9 AS
>>>> tags#22,_11#10 AS prId#23,_12#11 AS creationTime#24,_13#12 AS
>>>> creationTimeZone#25]), None
>>>>
>>>> == Physical Plan ==
>>>> TungstenAggregate(key=[entityId#16], functions=[], output=[entityId#16])
>>>>  TungstenExchange hashpartitioning(entityId#16)
>>>>   TungstenAggregate(key=[entityId#16], functions=[],
>>>> output=[entityId#16])
>>>>    Project [entityId#16]
>>>>     Filter (entityType#15 = user)
>>>>      InMemoryColumnarTableScan [entityId#16,entityType#15],
>>>> [(entityType#15 = user)], (InMemoryRelation
>>>> [id#13,event#14,entityType#15,entityId#16,targetEntityType#17,targetEntityId#18,properties#19,eventTime#20,eventTimeZone#21,tags#22,prId#23,creationTime#24,creationTimeZone#25],
>>>> true, 10000, StorageLevel(true, true, false, true, 1), (TungstenProject
>>>> [_1#0 AS id#13,_2#1 AS event#14,_3#2 AS entityType#15,_4#3 AS
>>>> entityId#16,_5#4 AS targetEntityType#17,_6#5 AS targetEntityId#18,_7#6 AS
>>>> properties#19,_8#7 AS eventTime#20,_9#8 AS eventTimeZone#21,_10#9 AS
>>>> tags#22,_11#10 AS prId#23,_12#11 AS creationTime#24,_13#12 AS
>>>> creationTimeZone#25]), None)
>>>>
>>>> Code Generation: true
>>>>
>>>> scala>
>>>>
>>>>
>>>>
>>>> 2015-11-06 5:27 GMT+09:00 Yin Huai <yh...@databricks.com>:
>>>>
>>>>> Can you attach the result of eventDF.filter($"entityType" ===
>>>>> "user").select("entityId").distinct.explain(true)?
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Yin
>>>>>
>>>>> On Thu, Nov 5, 2015 at 1:12 AM, 千成徳 <s.c...@opt.ne.jp> wrote:
>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>> I have data frame like this.
>>>>>>
>>>>>> Equality expression is not working in 1.5.1 but, works as expected in
>>>>>> 1.4.0
>>>>>> What is the difference?
>>>>>>
>>>>>> scala> eventDF.printSchema()
>>>>>> root
>>>>>>  |-- id: string (nullable = true)
>>>>>>  |-- event: string (nullable = true)
>>>>>>  |-- entityType: string (nullable = true)
>>>>>>  |-- entityId: string (nullable = true)
>>>>>>  |-- targetEntityType: string (nullable = true)
>>>>>>  |-- targetEntityId: string (nullable = true)
>>>>>>  |-- properties: string (nullable = true)
>>>>>>
>>>>>> scala>
>>>>>> eventDF.groupBy("entityType").agg(countDistinct("entityId")).show
>>>>>> +----------+------------------------+
>>>>>> |entityType|COUNT(DISTINCT entityId)|
>>>>>> +----------+------------------------+
>>>>>> |   ib_user|                    4751|
>>>>>> |      user|                    2091|
>>>>>> +----------+------------------------+
>>>>>>
>>>>>>
>>>>>> ----- not works ( Bug ? )
>>>>>> scala> eventDF.filter($"entityType" ===
>>>>>> "user").select("entityId").distinct.count
>>>>>> res151: Long = 1219
>>>>>>
>>>>>> scala> eventDF.filter(eventDF("entityType") ===
>>>>>> "user").select("entityId").distinct.count
>>>>>> res153: Long = 1219
>>>>>>
>>>>>> scala> eventDF.filter($"entityType" equalTo
>>>>>> "user").select("entityId").distinct.count
>>>>>> res149: Long = 1219
>>>>>>
>>>>>> ----- works as expected
>>>>>> scala> eventDF.map{ e => (e.getAs[String]("entityId"),
>>>>>> e.getAs[String]("entityType")) }.filter(x => x._2 ==
>>>>>> "user").map(_._1).distinct.count
>>>>>> res150: Long = 2091
>>>>>>
>>>>>> scala> eventDF.filter($"entityType" in
>>>>>> "user").select("entityId").distinct.count
>>>>>> warning: there were 1 deprecation warning(s); re-run with
>>>>>> -deprecation for details
>>>>>> res155: Long = 2091
>>>>>>
>>>>>> scala> eventDF.filter($"entityType" !==
>>>>>> "ib_user").select("entityId").distinct.count
>>>>>> res152: Long = 2091
>>>>>>
>>>>>>
>>>>>> But, All of above code works in 1.4.0
>>>>>>
>>>>>> Thanks.
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> -------------------------------------------------------
>>>> 千 成徳 (Sondoku Chon)
>>>>
>>>> 株 式 会社オプトホールディング http://www.opt.ne.jp/holding/
>>>> データサイエンスラボ https://datasciencelab.jp/
>>>> ビッ クデータアーキテクト
>>>>
>>>> 〒 102-0081 東京都千代田区四番町6東急番町ビル
>>>> Tel:080-4581-9708
>>>> Fax:050-3156-7346
>>>> -------------------------------------------------------
>>>>
>>>>
>>>>
>>>>
>>>> ------------------------------
>>>> Kelkoo SAS
>>>> Société par Actions Simplifiée
>>>> Au capital de € 4.168.964,30
>>>> Siège social : 158 Ter Rue du Temple 75003 Paris
>>>> 425 093 069 RCS Paris
>>>>
>>>> Ce message et les pièces jointes sont confidentiels et établis à
>>>> l'attention exclusive de leurs destinataires. Si vous n'êtes pas le
>>>> destinataire de ce message, merci de le détruire et d'en avertir
>>>> l'expéditeur.
>>>>
>>>
>>>
>>>
>>> --
>>> -------------------------------------------------------
>>> 千 成徳 (Sondoku Chon)
>>>
>>> 株式会社オプトホールディング http://www.opt.ne.jp/holding/
>>> データサイエンスラボ https://datasciencelab.jp/
>>> ビックデータアーキテクト
>>>
>>> 〒102-0081 東京都千代田区四番町6東急番町ビル
>>> Tel:080-4581-9708
>>> Fax:050-3156-7346
>>> -------------------------------------------------------
>>>
>>
>>
>


-- 
-------------------------------------------------------
千 成徳 (Sondoku Chon)

株式会社オプトホールディング http://www.opt.ne.jp/holding/
Data Science Lab https://datasciencelab.jp/
Big Data Architect

〒102-0081 東京都千代田区四番町6東急番町ビル
Tel:080-4581-9708
Fax:050-3156-7346
-------------------------------------------------------

Reply via email to