A DataFrame cache bug

2017-02-21 Thread gen tang
Hi All,

I found a strange bug which is related with reading data from a updated
path and cache operation.
Please consider the following code:

import org.apache.spark.sql.DataFrame

def f(data: DataFrame): DataFrame = {
  val df = data.filter("id>10")
  df.cache
  df.count
  df
}

f(spark.range(100).asInstanceOf[DataFrame]).count // output 89 which is
correct
f(spark.range(1000).asInstanceOf[DataFrame]).count // output 989 which is
correct

val dir = "/tmp/test"
spark.range(100).write.mode("overwrite").parquet(dir)
val df = spark.read.parquet(dir)
df.count // output 100 which is correct
f(df).count // output 89 which is correct

spark.range(1000).write.mode("overwrite").parquet(dir)
val df1 = spark.read.parquet(dir)
df1.count // output 1000 which is correct, in fact other operation expect
df1.filter("id>10") return correct result.
f(df1).count // output 89 which is incorrect

In fact when we use df1.filter("id>10"), spark will however use old cached
dataFrame

Any idea? Thanks a lot

Cheers
Gen


Re: A DataFrame cache bug

2017-02-21 Thread gen tang
Hi All,

I might find a related issue on jira:

https://issues.apache.org/jira/browse/SPARK-15678

This issue is closed, may be we should reopen it.

Thanks

Cheers
Gen


On Wed, Feb 22, 2017 at 1:57 PM, gen tang  wrote:

> Hi All,
>
> I found a strange bug which is related with reading data from a updated
> path and cache operation.
> Please consider the following code:
>
> import org.apache.spark.sql.DataFrame
>
> def f(data: DataFrame): DataFrame = {
>   val df = data.filter("id>10")
>   df.cache
>   df.count
>   df
> }
>
> f(spark.range(100).asInstanceOf[DataFrame]).count // output 89 which is
> correct
> f(spark.range(1000).asInstanceOf[DataFrame]).count // output 989 which is
> correct
>
> val dir = "/tmp/test"
> spark.range(100).write.mode("overwrite").parquet(dir)
> val df = spark.read.parquet(dir)
> df.count // output 100 which is correct
> f(df).count // output 89 which is correct
>
> spark.range(1000).write.mode("overwrite").parquet(dir)
> val df1 = spark.read.parquet(dir)
> df1.count // output 1000 which is correct, in fact other operation expect
> df1.filter("id>10") return correct result.
> f(df1).count // output 89 which is incorrect
>
> In fact when we use df1.filter("id>10"), spark will however use old cached
> dataFrame
>
> Any idea? Thanks a lot
>
> Cheers
> Gen
>


Re: A DataFrame cache bug

2017-02-21 Thread gen tang
Hi Kazuaki Ishizaki

Thanks a lot for your help. It works. However, a more strange bug appears
as follows:

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SparkSession

def f(path: String, spark: SparkSession): DataFrame = {
  val data = spark.read.option("mergeSchema", "true").parquet(path)
  println(data.count)
  val df = data.filter("id>10")
  df.cache
  println(df.count)
  val df1 = df.filter("id>11")
  df1.cache
  println(df1.count)
  df1
}

val dir = "/tmp/test"
spark.range(100).write.mode("overwrite").parquet(dir)
spark.catalog.refreshByPath(dir)
f(dir, spark).count // output 88 which is correct

spark.range(1000).write.mode("overwrite").parquet(dir)
spark.catalog.refreshByPath(dir)
f(dir, spark).count // output 88 which is incorrect

If we move refreshByPath into f(), just before spark.read. The whole code
works fine.

Any idea? Thanks a lot

Cheers
Gen


On Wed, Feb 22, 2017 at 2:22 PM, Kazuaki Ishizaki 
wrote:

> Hi,
> Thank you for pointing out the JIRA.
> I think that this JIRA suggests you to insert "spark.catalog.refreshByPath(
> dir)".
>
> val dir = "/tmp/test"
> spark.range(100).write.mode("overwrite").parquet(dir)
> val df = spark.read.parquet(dir)
> df.count // output 100 which is correct
> f(df).count // output 89 which is correct
>
> spark.range(1000).write.mode("overwrite").parquet(dir)
> spark.catalog.refreshByPath(dir)  // insert a NEW statement
> val df1 = spark.read.parquet(dir)
> df1.count // output 1000 which is correct, in fact other operation expect
> df1.filter("id>10") return correct result.
> f(df1).count // output 89 which is incorrect
>
> Regards,
> Kazuaki Ishizaki
>
>
>
> From:gen tang 
> To:dev@spark.apache.org
> Date:2017/02/22 15:02
> Subject:Re: A DataFrame cache bug
> --
>
>
>
> Hi All,
>
> I might find a related issue on jira:
>
> *https://issues.apache.org/jira/browse/SPARK-15678*
> <https://issues.apache.org/jira/browse/SPARK-15678>
>
> This issue is closed, may be we should reopen it.
>
> Thanks
>
> Cheers
> Gen
>
>
> On Wed, Feb 22, 2017 at 1:57 PM, gen tang <*gen.tan...@gmail.com*
> > wrote:
> Hi All,
>
> I found a strange bug which is related with reading data from a updated
> path and cache operation.
> Please consider the following code:
>
> import org.apache.spark.sql.DataFrame
>
> def f(data: DataFrame): DataFrame = {
>   val df = data.filter("id>10")
>   df.cache
>   df.count
>   df
> }
>
> f(spark.range(100).asInstanceOf[DataFrame]).count // output 89 which is
> correct
> f(spark.range(1000).asInstanceOf[DataFrame]).count // output 989 which is
> correct
>
> val dir = "/tmp/test"
> spark.range(100).write.mode("overwrite").parquet(dir)
> val df = spark.read.parquet(dir)
> df.count // output 100 which is correct
> f(df).count // output 89 which is correct
>
> spark.range(1000).write.mode("overwrite").parquet(dir)
> val df1 = spark.read.parquet(dir)
> df1.count // output 1000 which is correct, in fact other operation expect
> df1.filter("id>10") return correct result.
> f(df1).count // output 89 which is incorrect
>
> In fact when we use df1.filter("id>10"), spark will however use old cached
> dataFrame
>
> Any idea? Thanks a lot
>
> Cheers
> Gen
>
>
>


Re: A DataFrame cache bug

2017-02-22 Thread gen tang
Hi, The example that I provided is not very clear. And I add a more clear
example in jira.

Thanks

Cheers
Gen

On Wed, Feb 22, 2017 at 3:47 PM, gen tang  wrote:

> Hi Kazuaki Ishizaki
>
> Thanks a lot for your help. It works. However, a more strange bug appears
> as follows:
>
> import org.apache.spark.sql.DataFrame
> import org.apache.spark.sql.SparkSession
>
> def f(path: String, spark: SparkSession): DataFrame = {
>   val data = spark.read.option("mergeSchema", "true").parquet(path)
>   println(data.count)
>   val df = data.filter("id>10")
>   df.cache
>   println(df.count)
>   val df1 = df.filter("id>11")
>   df1.cache
>   println(df1.count)
>   df1
> }
>
> val dir = "/tmp/test"
> spark.range(100).write.mode("overwrite").parquet(dir)
> spark.catalog.refreshByPath(dir)
> f(dir, spark).count // output 88 which is correct
>
> spark.range(1000).write.mode("overwrite").parquet(dir)
> spark.catalog.refreshByPath(dir)
> f(dir, spark).count // output 88 which is incorrect
>
> If we move refreshByPath into f(), just before spark.read. The whole code
> works fine.
>
> Any idea? Thanks a lot
>
> Cheers
> Gen
>
>
> On Wed, Feb 22, 2017 at 2:22 PM, Kazuaki Ishizaki 
> wrote:
>
>> Hi,
>> Thank you for pointing out the JIRA.
>> I think that this JIRA suggests you to insert
>> "spark.catalog.refreshByPath(dir)".
>>
>> val dir = "/tmp/test"
>> spark.range(100).write.mode("overwrite").parquet(dir)
>> val df = spark.read.parquet(dir)
>> df.count // output 100 which is correct
>> f(df).count // output 89 which is correct
>>
>> spark.range(1000).write.mode("overwrite").parquet(dir)
>> spark.catalog.refreshByPath(dir)  // insert a NEW statement
>> val df1 = spark.read.parquet(dir)
>> df1.count // output 1000 which is correct, in fact other operation expect
>> df1.filter("id>10") return correct result.
>> f(df1).count // output 89 which is incorrect
>>
>> Regards,
>> Kazuaki Ishizaki
>>
>>
>>
>> From:gen tang 
>> To:dev@spark.apache.org
>> Date:    2017/02/22 15:02
>> Subject:Re: A DataFrame cache bug
>> --
>>
>>
>>
>> Hi All,
>>
>> I might find a related issue on jira:
>>
>> *https://issues.apache.org/jira/browse/SPARK-15678*
>> <https://issues.apache.org/jira/browse/SPARK-15678>
>>
>> This issue is closed, may be we should reopen it.
>>
>> Thanks
>>
>> Cheers
>> Gen
>>
>>
>> On Wed, Feb 22, 2017 at 1:57 PM, gen tang <*gen.tan...@gmail.com*
>> > wrote:
>> Hi All,
>>
>> I found a strange bug which is related with reading data from a updated
>> path and cache operation.
>> Please consider the following code:
>>
>> import org.apache.spark.sql.DataFrame
>>
>> def f(data: DataFrame): DataFrame = {
>>   val df = data.filter("id>10")
>>   df.cache
>>   df.count
>>   df
>> }
>>
>> f(spark.range(100).asInstanceOf[DataFrame]).count // output 89 which is
>> correct
>> f(spark.range(1000).asInstanceOf[DataFrame]).count // output 989 which
>> is correct
>>
>> val dir = "/tmp/test"
>> spark.range(100).write.mode("overwrite").parquet(dir)
>> val df = spark.read.parquet(dir)
>> df.count // output 100 which is correct
>> f(df).count // output 89 which is correct
>>
>> spark.range(1000).write.mode("overwrite").parquet(dir)
>> val df1 = spark.read.parquet(dir)
>> df1.count // output 1000 which is correct, in fact other operation expect
>> df1.filter("id>10") return correct result.
>> f(df1).count // output 89 which is incorrect
>>
>> In fact when we use df1.filter("id>10"), spark will however use old
>> cached dataFrame
>>
>> Any idea? Thanks a lot
>>
>> Cheers
>> Gen
>>
>>
>>
>


Spark on teradata?

2015-01-07 Thread gen tang
Hi,

I have a stupid question:
Is it possible to use spark on Teradata data warehouse, please? I read some
news on internet which say yes. However, I didn't find any example about
this issue

Thanks in advance.

Cheers
Gen


Potential bug broadcastNestedLoopJoin or default value of spark.sql.autoBroadcastJoinThreshold

2015-08-11 Thread gen tang
Hi,

Recently, I use spark sql to do join on non-equality condition, condition1
or condition2 for example.

Spark will use broadcastNestedLoopJoin to do this. Assume that one of
dataframe(df1) is not created from hive table nor local collection and the
other one is created from hivetable(df2). For df1, spark will use
defaultSizeInBytes * length of df1 to estimate the size of df1 and use
correct size for df2.

As the result, in most cases, spark will think df1 is bigger than df2 even
df2 is really huge. And spark will do df2.collect(), which will cause error
or slowness of program.

Maybe we should just use defaultSizeInBytes for logicalRDD, not
defaultSizeInBytes * length?

Hope this could be helpful
Thanks a lot in advance for your help and input.

Cheers
Gen


Re: Potential bug broadcastNestedLoopJoin or default value of spark.sql.autoBroadcastJoinThreshold

2015-08-11 Thread gen tang
Hi,

Thanks a lot.

The problem is not do non-equal join for large tables, in fact, one table
is really small and another one is huge.

The problem is that spark can only get the correct size for dataframe
created directly from hive table. Even we create a dataframe from local
collection, it uses defaultSizeInBytes as its size. (Here, I am really
confused: why we don't use LogicalLocalTable in exsitingRDD.scala to
estimate its size. As I understand, this case class is created for this
purpose)

Then if we do some join or unionAll operation on this dataframe, the
estimated size will explode.

For instance, if we do join, val df = df1.join(df2, condition) then
df.queryExecution.analyzed.statistics.sizeInBytes = df1 * df2

In my case, I create df1 instance from an existing rdd.

I find a workaround for this problem:
1. save df1 in hive table
2. read this hive table and create a new dataframe
3. do outer join with this new dataframe

Cheers
Gen

On Wed, Aug 12, 2015 at 10:12 AM, Cheng, Hao  wrote:

> Firstly, spark.sql.autoBroadcastJoinThreshold only works for the EQUAL
> JOIN.
>
>
>
> Currently, for the non-equal join, if the join type is the INNER join,
> then it will be done by CartesianProduct join and BroadcastNestedLoopJoin
> works for the outer joins.
>
>
>
> In the BroadcastnestedLoopJoin, the table with smaller estimate size will
> be broadcasted, but if the smaller table is also a huge table, I don’t
> think Spark SQL can handle that right now (OOM).
>
>
>
> So, I am not sure how you created the df1 instance, but we’d better to
> reflect the real size for the statistics of it, and let the framework
> decide what to do, hopefully Spark Sql can support the non-equal join for
> large tables in the next release.
>
>
>
> Hao
>
>
>
> *From:* gen tang [mailto:gen.tan...@gmail.com]
> *Sent:* Tuesday, August 11, 2015 10:12 PM
> *To:* dev@spark.apache.org
> *Subject:* Potential bug broadcastNestedLoopJoin or default value of
> spark.sql.autoBroadcastJoinThreshold
>
>
>
> Hi,
>
>
>
> Recently, I use spark sql to do join on non-equality condition, condition1
> or condition2 for example.
>
>
>
> Spark will use broadcastNestedLoopJoin to do this. Assume that one of
> dataframe(df1) is not created from hive table nor local collection and the
> other one is created from hivetable(df2). For df1, spark will use
> defaultSizeInBytes * length of df1 to estimate the size of df1 and use
> correct size for df2.
>
>
>
> As the result, in most cases, spark will think df1 is bigger than df2 even
> df2 is really huge. And spark will do df2.collect(), which will cause error
> or slowness of program.
>
>
>
> Maybe we should just use defaultSizeInBytes for logicalRDD, not
> defaultSizeInBytes * length?
>
>
>
> Hope this could be helpful
>
> Thanks a lot in advance for your help and input.
>
>
>
> Cheers
>
> Gen
>
>
>


Fwd: dataframe slow down with tungsten turn on

2015-11-04 Thread gen tang
Hi,

In fact, I tested the same code with spark 1.5 with tungsten turning off.
The result is quite the same as tungsten turning on.
It seems that it is not the problem of tungsten, it is simply that spark
1.5 is slower than spark 1.4.

Is there any idea about why it happens?
Thanks a lot in advance

Cheers
Gen


-- Forwarded message --
From: gen tang 
Date: Wed, Nov 4, 2015 at 3:54 PM
Subject: dataframe slow down with tungsten turn on
To: "u...@spark.apache.org" 


Hi sparkers,

I am using dataframe to do some large ETL jobs.
More precisely, I create dataframe from HIVE table and do some operations.
And then I save it as json.

When I used spark-1.4.1, the whole process is quite fast, about 1 mins.
However, when I use the same code with spark-1.5.1(with tungsten turn on),
it takes a about 2 hours to finish the same job.

I checked the detail of tasks, almost all the time is consumed by
computation.

Any idea about why this happens?

Thanks a lot in advance for your help.

Cheers
Gen


Fwd: dataframe slow down with tungsten turn on

2015-11-05 Thread gen tang
-- Forwarded message --
From: gen tang 
Date: Fri, Nov 6, 2015 at 12:14 AM
Subject: Re: dataframe slow down with tungsten turn on
To: "Cheng, Hao" 


Hi,

My application is as follows:
1. create dataframe from hive table
2. transform dataframe to rdd of json and do some aggregations on json (in
fact, I use pyspark, so it is rdd of dict)
3. retransform rdd of json to dataframe and cache it (triggered by count)
4. join several dataframe which is created by the above steps.
5. save final dataframe as json.(by dataframe write api)

There are a lot of stages, other stage is quite the same under two version
of spark. However, the final step (save as json) is 1 min vs. 2 hour. In my
opinion, I think it is writing to hdfs cause the slowness of final stage.
However, I don't know why...

In fact, I make a mistake about the version of spark that I used. The spark
which runs faster is build on source code of spark 1.4.1. The spark which
runs slower is build on source code of spark 1.5.2, 2 days ago.

Any idea? Thanks a lot.

Cheers
Gen


On Thu, Nov 5, 2015 at 1:01 PM, Cheng, Hao  wrote:

> BTW, 1 min V.S. 2 Hours, seems quite weird, can you provide more
> information on the ETL work?
>
>
>
> *From:* Cheng, Hao [mailto:hao.ch...@intel.com]
> *Sent:* Thursday, November 5, 2015 12:56 PM
> *To:* gen tang; dev@spark.apache.org
> *Subject:* RE: dataframe slow down with tungsten turn on
>
>
>
> 1.5 has critical performance / bug issues, you’d better try 1.5.1 or
> 1.5.2rc version.
>
>
>
> *From:* gen tang [mailto:gen.tan...@gmail.com ]
> *Sent:* Thursday, November 5, 2015 12:43 PM
> *To:* dev@spark.apache.org
> *Subject:* Fwd: dataframe slow down with tungsten turn on
>
>
>
> Hi,
>
>
>
> In fact, I tested the same code with spark 1.5 with tungsten turning off.
> The result is quite the same as tungsten turning on.
>
> It seems that it is not the problem of tungsten, it is simply that spark
> 1.5 is slower than spark 1.4.
>
>
>
> Is there any idea about why it happens?
>
> Thanks a lot in advance
>
>
>
> Cheers
>
> Gen
>
>
>
>
>
> -- Forwarded message --
> From: *gen tang* 
> Date: Wed, Nov 4, 2015 at 3:54 PM
> Subject: dataframe slow down with tungsten turn on
> To: "u...@spark.apache.org" 
>
> Hi sparkers,
>
>
>
> I am using dataframe to do some large ETL jobs.
>
> More precisely, I create dataframe from HIVE table and do some operations.
> And then I save it as json.
>
>
>
> When I used spark-1.4.1, the whole process is quite fast, about 1 mins.
> However, when I use the same code with spark-1.5.1(with tungsten turn on),
> it takes a about 2 hours to finish the same job.
>
>
>
> I checked the detail of tasks, almost all the time is consumed by
> computation.
>
> Any idea about why this happens?
>
>
>
> Thanks a lot in advance for your help.
>
>
>
> Cheers
>
> Gen
>
>
>
>
>