In particular this is sounding like:
https://issues.apache.org/jira/browse/SPARK-10859

On Fri, Nov 6, 2015 at 1:05 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> I would be great if you could try sql("SET
> spark.sql.inMemoryColumnarStorage.partitionPruning=false") also, try Spark
> 1.5.2-RC2
> <http://people.apache.org/~pwendell/spark-releases/spark-1.5.2-rc2-bin/>
>
> On Fri, Nov 6, 2015 at 4:49 AM, Seongduk Cheon <s.c...@opt.ne.jp> wrote:
>
>> Hi Yanal!
>>
>> Yes, exactly. I read from csv file and convert to DF with column names.
>>
>> simply look like this.
>>
>> val eventDF = sc.textFile(eventFile).map(_.split(",")).filter(_.size >= 6)
>>   .map { e => .... // To do sometings
>>   }.toDF(eventTableColumns:_*).cache()
>>
>>
>> The result of <=> function is
>>
>> scala> eventDF.filter($"entityType" <=>
>> "user").select("entityId").distinct.count
>> res25: Long = 2091
>>
>> As you mentioned, It seems related to nullable column.
>> Using case class works as expected. It is one of the best workaround so
>> far.
>>
>>
>>
>> 2015-11-06 19:01 GMT+09:00 Yanal Wazaefi <yanal.waza...@kelkoo.com>:
>>
>>> Hi Sondoku,
>>>
>>> Are you converting an event RDD using toDF("id", "event", "entityType",
>>> "entityId", "targetEntityType", "targetEntityId", "properties")
>>> function to get your eventDF ?
>>>
>>> Does <=> give you the correct result too (2091) ?
>>> eventDF.filter($"entityType" <=>
>>> "user").select("entityId").distinct.count
>>>
>>> I had the same problem with the DataFrame equality, using toDF("col1",
>>> "col2", ...) function.
>>>
>>> To resolve this problem (bug?), I used a *case class* and then I
>>> applied toDF() function.
>>> Something like that in your case:
>>>
>>> case class Event(id: String, event: String, entityType: String, entityId:
>>> String, targetEntityType: String, targetEntityId: String, properties:
>>> String)
>>> eventRDD.map{case (id, event, entityType, entityId, targetEntityType, 
>>> targetEntityId,
>>> properties) =>
>>> Event(id, event, entityType, entityId, targetEntityType, targetEntityId,
>>> properties) }.toDF()
>>>
>>> The comparison === should work in this case.
>>>
>>> The problem (I think) comes from some null values in the columns that
>>> are before the column user (e.g. id or event).
>>>
>>> Yanal
>>>
>>> Subject: Re: DataFrame equality does not working in 1.5.1 Date: Fri, 6
>>> Nov 2015 02:14:18 +0100 From: Seongduk Cheon <s.c...@opt.ne.jp>
>>> <s.c...@opt.ne.jp> To: Yin Huai <yh...@databricks.com>
>>> <yh...@databricks.com> CC: user <user@spark.apache.org>
>>> <user@spark.apache.org>
>>>
>>>
>>> Hi, Yin
>>>
>>> Thanks for your time. This is the result.
>>>
>>> ------------------
>>> scala> eventDF.filter($"entityType" ===
>>> "user").select("entityId").distinct.explain(true)
>>> == Parsed Logical Plan ==
>>> Aggregate [entityId#16], [entityId#16]
>>>  Project [entityId#16]
>>>   Filter (entityType#15 = user)
>>>    Project [_1#0 AS id#13,_2#1 AS event#14,_3#2 AS entityType#15,_4#3 AS
>>> entityId#16,_5#4 AS targetEntityType#17,_6#5 AS targetEntityId#18,_7#6 AS
>>> properties#19,_8#7 AS eventTime#20,_9#8 AS eventTimeZone#21,_10#9 AS
>>> tags#22,_11#10 AS prId#23,_12#11 AS creationTime#24,_13#12 AS
>>> creationTimeZone#25]
>>>     LogicalRDD
>>> [_1#0,_2#1,_3#2,_4#3,_5#4,_6#5,_7#6,_8#7,_9#8,_10#9,_11#10,_12#11,_13#12],
>>> MapPartitionsRDD[6] at rddToDataFrameHolder at <console>:61
>>>
>>> == Analyzed Logical Plan ==
>>> entityId: string
>>> Aggregate [entityId#16], [entityId#16]
>>>  Project [entityId#16]
>>>   Filter (entityType#15 = user)
>>>    Project [_1#0 AS id#13,_2#1 AS event#14,_3#2 AS entityType#15,_4#3 AS
>>> entityId#16,_5#4 AS targetEntityType#17,_6#5 AS targetEntityId#18,_7#6 AS
>>> properties#19,_8#7 AS eventTime#20,_9#8 AS eventTimeZone#21,_10#9 AS
>>> tags#22,_11#10 AS prId#23,_12#11 AS creationTime#24,_13#12 AS
>>> creationTimeZone#25]
>>>     LogicalRDD
>>> [_1#0,_2#1,_3#2,_4#3,_5#4,_6#5,_7#6,_8#7,_9#8,_10#9,_11#10,_12#11,_13#12],
>>> MapPartitionsRDD[6] at rddToDataFrameHolder at <console>:61
>>>
>>> == Optimized Logical Plan ==
>>> Aggregate [entityId#16], [entityId#16]
>>>  Project [entityId#16]
>>>   Filter (entityType#15 = user)
>>>    InMemoryRelation
>>> [id#13,event#14,entityType#15,entityId#16,targetEntityType#17,targetEntityId#18,properties#19,eventTime#20,eventTimeZone#21,tags#22,prId#23,creationTime#24,creationTimeZone#25],
>>> true, 10000, StorageLevel(true, true, false, true, 1), (TungstenProject
>>> [_1#0 AS id#13,_2#1 AS event#14,_3#2 AS entityType#15,_4#3 AS
>>> entityId#16,_5#4 AS targetEntityType#17,_6#5 AS targetEntityId#18,_7#6 AS
>>> properties#19,_8#7 AS eventTime#20,_9#8 AS eventTimeZone#21,_10#9 AS
>>> tags#22,_11#10 AS prId#23,_12#11 AS creationTime#24,_13#12 AS
>>> creationTimeZone#25]), None
>>>
>>> == Physical Plan ==
>>> TungstenAggregate(key=[entityId#16], functions=[], output=[entityId#16])
>>>  TungstenExchange hashpartitioning(entityId#16)
>>>   TungstenAggregate(key=[entityId#16], functions=[],
>>> output=[entityId#16])
>>>    Project [entityId#16]
>>>     Filter (entityType#15 = user)
>>>      InMemoryColumnarTableScan [entityId#16,entityType#15],
>>> [(entityType#15 = user)], (InMemoryRelation
>>> [id#13,event#14,entityType#15,entityId#16,targetEntityType#17,targetEntityId#18,properties#19,eventTime#20,eventTimeZone#21,tags#22,prId#23,creationTime#24,creationTimeZone#25],
>>> true, 10000, StorageLevel(true, true, false, true, 1), (TungstenProject
>>> [_1#0 AS id#13,_2#1 AS event#14,_3#2 AS entityType#15,_4#3 AS
>>> entityId#16,_5#4 AS targetEntityType#17,_6#5 AS targetEntityId#18,_7#6 AS
>>> properties#19,_8#7 AS eventTime#20,_9#8 AS eventTimeZone#21,_10#9 AS
>>> tags#22,_11#10 AS prId#23,_12#11 AS creationTime#24,_13#12 AS
>>> creationTimeZone#25]), None)
>>>
>>> Code Generation: true
>>>
>>> scala>
>>>
>>>
>>>
>>> 2015-11-06 5:27 GMT+09:00 Yin Huai <yh...@databricks.com>:
>>>
>>>> Can you attach the result of eventDF.filter($"entityType" ===
>>>> "user").select("entityId").distinct.explain(true)?
>>>>
>>>> Thanks,
>>>>
>>>> Yin
>>>>
>>>> On Thu, Nov 5, 2015 at 1:12 AM, 千成徳 <s.c...@opt.ne.jp> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> I have data frame like this.
>>>>>
>>>>> Equality expression is not working in 1.5.1 but, works as expected in
>>>>> 1.4.0
>>>>> What is the difference?
>>>>>
>>>>> scala> eventDF.printSchema()
>>>>> root
>>>>>  |-- id: string (nullable = true)
>>>>>  |-- event: string (nullable = true)
>>>>>  |-- entityType: string (nullable = true)
>>>>>  |-- entityId: string (nullable = true)
>>>>>  |-- targetEntityType: string (nullable = true)
>>>>>  |-- targetEntityId: string (nullable = true)
>>>>>  |-- properties: string (nullable = true)
>>>>>
>>>>> scala>
>>>>> eventDF.groupBy("entityType").agg(countDistinct("entityId")).show
>>>>> +----------+------------------------+
>>>>> |entityType|COUNT(DISTINCT entityId)|
>>>>> +----------+------------------------+
>>>>> |   ib_user|                    4751|
>>>>> |      user|                    2091|
>>>>> +----------+------------------------+
>>>>>
>>>>>
>>>>> ----- not works ( Bug ? )
>>>>> scala> eventDF.filter($"entityType" ===
>>>>> "user").select("entityId").distinct.count
>>>>> res151: Long = 1219
>>>>>
>>>>> scala> eventDF.filter(eventDF("entityType") ===
>>>>> "user").select("entityId").distinct.count
>>>>> res153: Long = 1219
>>>>>
>>>>> scala> eventDF.filter($"entityType" equalTo
>>>>> "user").select("entityId").distinct.count
>>>>> res149: Long = 1219
>>>>>
>>>>> ----- works as expected
>>>>> scala> eventDF.map{ e => (e.getAs[String]("entityId"),
>>>>> e.getAs[String]("entityType")) }.filter(x => x._2 ==
>>>>> "user").map(_._1).distinct.count
>>>>> res150: Long = 2091
>>>>>
>>>>> scala> eventDF.filter($"entityType" in
>>>>> "user").select("entityId").distinct.count
>>>>> warning: there were 1 deprecation warning(s); re-run with -deprecation
>>>>> for details
>>>>> res155: Long = 2091
>>>>>
>>>>> scala> eventDF.filter($"entityType" !==
>>>>> "ib_user").select("entityId").distinct.count
>>>>> res152: Long = 2091
>>>>>
>>>>>
>>>>> But, All of above code works in 1.4.0
>>>>>
>>>>> Thanks.
>>>>>
>>>>>
>>>>
>>>
>>>
>>> --
>>> -------------------------------------------------------
>>> 千 成徳 (Sondoku Chon)
>>>
>>> 株 式 会社オプトホールディング http://www.opt.ne.jp/holding/
>>> データサイエンスラボ https://datasciencelab.jp/
>>> ビッ クデータアーキテクト
>>>
>>> 〒 102-0081 東京都千代田区四番町6東急番町ビル
>>> Tel:080-4581-9708
>>> Fax:050-3156-7346
>>> -------------------------------------------------------
>>>
>>>
>>>
>>>
>>> ------------------------------
>>> Kelkoo SAS
>>> Société par Actions Simplifiée
>>> Au capital de € 4.168.964,30
>>> Siège social : 158 Ter Rue du Temple 75003 Paris
>>> 425 093 069 RCS Paris
>>>
>>> Ce message et les pièces jointes sont confidentiels et établis à
>>> l'attention exclusive de leurs destinataires. Si vous n'êtes pas le
>>> destinataire de ce message, merci de le détruire et d'en avertir
>>> l'expéditeur.
>>>
>>
>>
>>
>> --
>> -------------------------------------------------------
>> 千 成徳 (Sondoku Chon)
>>
>> 株式会社オプトホールディング http://www.opt.ne.jp/holding/
>> データサイエンスラボ https://datasciencelab.jp/
>> ビックデータアーキテクト
>>
>> 〒102-0081 東京都千代田区四番町6東急番町ビル
>> Tel:080-4581-9708
>> Fax:050-3156-7346
>> -------------------------------------------------------
>>
>
>

Reply via email to