Big performance difference when joining 3 tables in different order

2015-06-04 Thread Hao Ren
Hi, I encountered a performance issue when join 3 tables in sparkSQL. Here is the query: SELECT g.period, c.categoryName, z.regionName, action, list_id, cnt FROM t_category c, t_zipcode z, click_meter_site_grouped g WHERE c.refCategoryID = g.category AND z.regionCode = g.region I need to pay a

Re: map vs mapPartitions

2015-06-25 Thread Hao Ren
hasNext()){ >> > output.add(input.next().length()); >> > } >> > return output; >> > } >> > >> > }); >> > >> > >> > Here does input is present in memory and can contain complete partition >> of >> > gbs ? >> &

Fwd: map vs mapPartitions

2015-06-25 Thread Hao Ren
-- Forwarded message -- From: Hao Ren Date: Thu, Jun 25, 2015 at 7:03 PM Subject: Re: map vs mapPartitions To: Shushant Arora In fact, map and mapPartitions produce RDD of the same type: MapPartitionsRDD. Check RDD api source code below: def map[U: ClassTag](f: T => U): RD

SparkContext and JavaSparkContext

2015-06-29 Thread Hao Ren
Hi, I am working on legacy project using spark java code. I have a function which takes sqlContext as an argument, however, I need a JavaSparkContext in that function. It seems that sqlContext.sparkContext() return a scala sparkContext. I did not find any API for casting a scala sparkContext t

Re: SparkContext and JavaSparkContext

2015-06-29 Thread Hao Ren
at 11:15 AM, Hao Ren wrote: > Hi, > > I am working on legacy project using spark java code. > > I have a function which takes sqlContext as an argument, however, I need a > JavaSparkContext in that function. > > It seems that sqlContext.sparkContext() return a scala sparkCon

[SPARK-SQL] Re-use col alias in the select clause to avoid sub query

2015-07-06 Thread Hao Ren
Hi, I want to re-use column alias in the select clause to avoid sub query. For example: select check(key) as b, abs(b) as abs, value1, value2, ..., value30 from test The query above does not work, because b is not defined in the test's schema. In stead, I should change the query to the followi

spark-submit can not resolve spark-hive_2.10

2015-07-07 Thread Hao Ren
I want to add spark-hive as a dependence to submit my job, but it seems that spark-submit can not resolve it. $ ./bin/spark-submit \ → --packages org.apache.spark:spark-hive_2.10:1.4.0,org.postgresql:postgresql:9.3-1103-jdbc3,joda-time:joda-time:2.8.1 \ → --class fr.leboncoin.etl.jobs.dwh.AdStateT

[SPARK-SQL] Window Functions optimization

2015-07-13 Thread Hao Ren
Hi, I would like to know: Is there any optimization has been done for window functions in Spark SQL? For example. select key, max(value1) over(partition by key) as m1, max(value2) over(partition by key) as m2, max(value3) over(partition by key) as m3 from table The query above creates 3 fields

Re: spark-submit can not resolve spark-hive_2.10

2015-07-15 Thread Hao Ren
e spark-assembly by adding -Phive during mvn package or sbt assembly. > > Best, > Burak > > On Tue, Jul 7, 2015 at 8:06 AM, Hao Ren wrote: > >> I want to add spark-hive as a dependence to submit my job, but it seems >> that >> spark-submit can not reso

S3 Read / Write makes executors deadlocked

2015-07-16 Thread Hao Ren
.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) -- Hao Ren Data Engineer @ leboncoin Paris, France

Re: S3 Read / Write makes executors deadlocked

2015-07-16 Thread Hao Ren
mmon use case. Any help on this issue is highly appreciated. If you need more info, checkout the jira I created: https://issues.apache.org/jira/browse/SPARK-8869 On Thu, Jul 16, 2015 at 11:39 AM, Hao Ren wrote: > Given the following code which just reads from s3, then saves files to s3

[Streaming] Difference between windowed stream and stream with large batch size?

2016-03-07 Thread Hao Ren
users observe the same result ? 2. If yes, what is the advantage of one vs. another ? Performance or something else ? 3. Is a stream with large batch reasonable , say 30 mins or even an hour ? Thank you. -- Hao Ren Data Engineer @ leboncoin Paris, France

Re: [Streaming] Difference between windowed stream and stream with large batch size?

2016-03-16 Thread Hao Ren
Any ideas ? Feel free to ask me more details, if my questions are not clear. Thank you. On Mon, Mar 7, 2016 at 3:38 PM, Hao Ren wrote: > I want to understand the advantage of using windowed stream. > > For example, > > Stream 1: > initial duration = 5 s, > and then tran

[Streaming] textFileStream has no events shown in web UI

2016-03-16 Thread Hao Ren
output of the steam is also correct. Not sure why web ui has not detected any events. Thank you. -- Hao Ren Data Engineer @ leboncoin Paris, France

Can not kill driver properly

2016-03-21 Thread Hao Ren
workaround is to ssh to the driver node, then kill -9 ... jsp shows the same classname DriverWrapper, so need to pick carefully... Any idea why this happens ? BTW, my streaming job's batch duration is one hour. So do we need to wait for job processing to kill kill driver ? -- Hao Ren

Re: Can not kill driver properly

2016-03-21 Thread Hao Ren
Update: I am using --supervise flag for fault tolerance. On Mon, Mar 21, 2016 at 4:16 PM, Hao Ren wrote: > Using spark 1.6.1 > Spark Streaming Jobs are submitted via spark-submit (cluster mode) > > I tried to kill drivers via webUI, it does not work. These drivers are > sti

SQLContext and HiveContext parse a query string differently ?

2016-05-12 Thread Hao Ren
orToken(unclosed string literal) found // case 3 df.selectExpr("cast(a as array)").show() // OK with HiveContext and SQLContext // case 4 df.selectExpr("'a\\'b'").show() // HiveContext, SQLContext => failure: end of input expected } - Any clarification / workaround is high appreciated. -- Hao Ren Data Engineer @ leboncoin Paris, France

Re: SQLContext and HiveContext parse a query string differently ?

2016-05-13 Thread Hao Ren
primitive type specification; line 1 pos 17 > // SQLContext => OK > > // case 2 > context.sql("select 'a\\'b'").show() > // HiveContext => OK > // SQLContext => failure: ``union'' expected but ErrorToken(unclosed string > literal

[MLlib] Term Frequency in TF-IDF seems incorrect

2016-08-01 Thread Hao Ren
? -- Hao Ren Data Engineer @ leboncoin Paris, France

[SPARK-2.0][SQL] UDF containing non-serializable object does not work as expected

2016-08-07 Thread Hao Ren
uot;key" === 2).show() // *It does not work as expected (org.apache.spark.SparkException: Task not serializable)* } run() } Also, I tried collect(), count(), first(), limit(). All of them worked without non-serializable exceptions. It seems only filter() throws the exception

Re: [SPARK-2.0][SQL] UDF containing non-serializable object does not work as expected

2016-08-08 Thread Hao Ren
Yes, it is. You can define a udf like that. Basically, it's a udf Int => Int which is a closure contains a non serializable object. The latter should cause Task not serializable exception. Hao On Mon, Aug 8, 2016 at 5:08 AM, Muthu Jayakumar wrote: > Hello Hao Ren, > >

Re: [SPARK-2.0][SQL] UDF containing non-serializable object does not work as expected

2016-08-08 Thread Hao Ren
Ints are >> serializable? >> >> >> >> Just thinking out loud >> >> >> >> Simon Scott >> >> >> >> Research Developer @ viavisolutions.com >> >> >> >> *From:* Hao Ren [mailto:inv...@gmail.com] >> *Sent:

Unresolved dep when building project with spark 1.6

2016-02-29 Thread Hao Ren
t.ResolveException: unresolved dependency: org.fusesource.leveldbjni#leveldbjni-all;1.8: org.fusesource.leveldbjni#leveldbjni-all;1.8!leveldbjni-all.pom(pom.original) origin location must be absolute: file:/home/invkrh/.m2/repository/org/fusesource/leveldbjni/leveldbjni-all/1.8/leveldbjni-all-1.8.pom Thank you. -- Hao Ren Data Engineer @ leboncoin Paris, France

DataFrame writer removes fields which is null for all rows

2015-07-21 Thread Hao Ren
#x27;s a default behavior for DF. But I would like to keep the null fields for schema consistency. Are there some options/configs to do for this purpose ? Thx. -- Hao Ren Data Engineer @ leboncoin Paris, France

How to distribute non-serializable object in transform task or broadcast ?

2015-08-07 Thread Hao Ren
lp is appreciated. -- Hao Ren Data Engineer @ leboncoin Paris, France

ClosureCleaner does not work for java code

2015-08-10 Thread Hao Ren
JFunction[T, R]): JavaRDD[R] = new JavaRDD(rdd.map(f)(fakeClassTag))(fakeClassTag) You can reproduce this issue easily, any help is appreciated. -- Hao Ren Data Engineer @ leboncoin Paris, France

[Spark Streaming] map and window operation on DStream only process one batch

2016-11-22 Thread Hao Ren
on. I am not sure whether this is related with KafkaDStream or just DStream. Any help is appreciated. -- Hao Ren Data Engineer @ leboncoin Paris, France

Spark SQL reads all leaf directories on a partitioned Hive table

2019-08-07 Thread Hao Ren
ing `spark.read.parquet` API to read parquet files directly. Spark has partition-awareness for partitioned directories. But still, I would like to know if there is a way to leverage partition-awareness via Hive by using `spark.sql` API? Any help is highly appreciated! Thank you. -- Hao Ren

Fwd: Spark SQL reads all leaf directories on a partitioned Hive table

2019-08-08 Thread Hao Ren
-- Forwarded message - From: Hao Ren Date: Thu, Aug 8, 2019 at 4:15 PM Subject: Re: Spark SQL reads all leaf directories on a partitioned Hive table To: Gourav Sengupta Hi Gourva, I am using enableHiveSupport. The table was not created by Spark. The table already exists in

Re: Spark SQL reads all leaf directories on a partitioned Hive table

2019-08-09 Thread Hao Ren
r any monetary damages arising from > such loss, damage or destruction. > > > > > On Thu, 8 Aug 2019 at 15:16, Hao Ren wrote: > >> >> >> -- Forwarded message - >> From: Hao Ren >> Date: Thu, Aug 8, 2019 at 4:15 PM >> Subject

Re: Spark SQL reads all leaf directories on a partitioned Hive table

2019-08-14 Thread Hao Ren
gt; > If anyone have better explanation please let me know - I have same > question. Why only parquet has this problem ? > > Thanks > Subash > > On Fri, 9 Aug 2019 at 16:18, Hao Ren wrote: > >> Hi Mich, >> >> Thank you for your reply. >> I need to be more

Broadcast variable questions

2015-01-21 Thread Hao Ren
Hi, Spark 1.2.0, standalone, local mode(for test) Here are several questions on broadcast variable: 1) Where is the broadcast variable cached on executors ? In memory or On disk ? I read somewhere, it was said these variables are stored in spark.local.dir. But I can find any info in Spark 1.2

HiveContext setConf seems not stable

2015-04-01 Thread Hao Ren
*Results:* (hive.metastore.warehouse.dir,/home/spark/hive/warehouse_test) *You can reproduce this if you move to the latest branch-1.3 (1.3.1-snapshot, htag = 7d029cb1eb6f1df1bce1a3f5784fb7ce2f981a33)* *I have also tested the released 1.3.0 (htag = 4aaf48d46d13129f0f9bdafd771dd80fe568a7dc). It has the same problem.* *Please tell me if I am missing something. Any help is highly appreciated.* Hao -- Hao Ren {Data, Software} Engineer @ ClaraVista Paris, France

Re: HiveContext setConf seems not stable

2015-04-02 Thread Hao Ren
Hi, Jira created: https://issues.apache.org/jira/browse/SPARK-6675 Thank you. On Wed, Apr 1, 2015 at 7:50 PM, Michael Armbrust wrote: > Can you open a JIRA please? > > On Wed, Apr 1, 2015 at 9:38 AM, Hao Ren wrote: > >> Hi, >> >> I find HiveContext.setConf doe

The differentce between SparkSql/DataFram join and Rdd join

2015-04-07 Thread Hao Ren
Hi, We have 2 hive tables and want to join one with the other. Initially, we ran a sql request on HiveContext. But it did not work. It was blocked on 30/600 tasks. Then we tried to load tables into two DataFrames, we have encountered the same problem. Finally, it works with RDD.join. What we have

Re: The differentce between SparkSql/DataFram join and Rdd join

2015-04-08 Thread Hao Ren
r.scala:531) at org.apache.spark.deploy.worker.Worker.main(Worker.scala) "VM Thread" prio=10 tid=0x7f149407d000 nid=0xe75 runnable "GC task thread#0 (ParallelGC)" prio=10 tid=0x7f149401f000 nid=0xe6d runnable "GC task thread#1 (ParallelGC)" prio=10 tid=0x7

SQL can't not create Hive database

2015-04-09 Thread Hao Ren
Hi, I am working on the local mode. The following code hiveContext.setConf("hive.metastore.warehouse.dir", /home/spark/hive/warehouse) hiveContext.sql("create database if not exists db1") throws 15/04/09 13:53:16 ERROR RetryingHMSHandler: MetaException(message:Unable to create database path

why does groupByKey return RDD[(K, Iterable[V])] not RDD[(K, CompactBuffer[V])] ?

2015-04-22 Thread Hao Ren
Hi, Just a quick question, Regarding the source code of groupByKey: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L453 In the end, it cast CompactBuffer to Iterable. But why ? Any advantage? Thank you. Hao. -- View this message

Re: why does groupByKey return RDD[(K, Iterable[V])] not RDD[(K, CompactBuffer[V])] ?

2015-04-23 Thread Hao Ren
Should I repost this to dev list ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/why-does-groupByKey-return-RDD-K-Iterable-V-not-RDD-K-CompactBuffer-V-tp22616p22640.html Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: sbt directory missed

2014-07-29 Thread Hao REN
What makes one confused is that, spark-0.9.2-bin-hadoop1.tgz => contains source code and sbt spark-1.0.1-bin-hadoop1.tgz => does not According

Re: SparkSQL: select syntax

2014-10-14 Thread Hao Ren
Update: This syntax is mainly for avoiding retyping column names. Let's take the example in my previous post, where *a* is a table of 15 columns, *b* has 5 columns, after a join, I have a table of (15 + 5 - 1(key in b)) = 19 columns and register the table in sqlContext. I don't want to actually

Re: SparkSQL: select syntax

2014-10-14 Thread Hao Ren
Thank you, Gen. I will give hiveContext a try. =) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-select-syntax-tp16299p16368.html Sent from the Apache Spark User List mailing list archive at Nabble.com. ---

SparkSQL: set hive.metastore.warehouse.dir in CLI doesn't work

2014-10-15 Thread Hao Ren
Hi, The following query in sparkSQL 1.1.0 CLI doesn't work. *SET hive.metastore.warehouse.dir=/home/spark/hive/warehouse ; create table test as select v1.*, v2.card_type, v2.card_upgrade_time_black, v2.card_upgrade_time_gold from customer v1 left join customer_loyalty v2 on v1.account_id = v2.ac

Understanding spark operation pipeline and block storage

2014-11-05 Thread Hao Ren
Hi, I would like to understand the pipeline of spark's operation(transformation and action) and some details on block storage. Let's consider the following code: val rdd1 = SparkContext.textFile("hdfs://...") rdd1.map(func1).map(func2).count For example, we have a file in hdfs about 80Gb, alrea

Re: Understanding spark operation pipeline and block storage

2014-11-10 Thread Hao Ren
Hey, guys Feel free to ask for more details if my questions are not clear. Any insight here ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Understanding-spark-operation-pipeline-and-block-storage-tp18201p18496.html Sent from the Apache Spark User Lis

Building Spark with hive does not work

2014-11-17 Thread Hao Ren
Hi, I am building spark on the most recent master branch. I checked this page: https://github.com/apache/spark/blob/master/docs/sql-programming-guide.md The cmd *./sbt/sbt -Phive -Phive-thirftserver clean assembly/assembly* works fine. A fat jar is created. However, when I started the SQL-CLI,

Re: Understanding spark operation pipeline and block storage

2014-11-17 Thread Hao Ren
Hi Cheng, You are right. =) I checked your article a few days ago. I have some further questions: According to the article, the following code takes the spatial complexity o(1). val lines = spark.textFile("hdfs://") val errors = lines.filter(_.startsWith("ERROR")) val messages = errors.map(_.sp

Re: Building Spark with hive does not work

2014-11-17 Thread Hao Ren
Hi Cheng, Actually, I am just using the commit 64c6b9b Here is HEAD of the master branch: $ git log commit 64c6b9bad559c21f25cd9fbe37c8813cdab939f2 Author: Michael Armbrust Date: Sun Nov 16 21:55:57 2014 -0800 [SPARK-4410][SQL] Add support for external sort Adds a new operator

Re: Building Spark with hive does not work

2014-11-17 Thread Hao Ren
Sry for spamming, Just after my previous post, I noticed that the command used is: ./sbt/sbt -Phive -Phive-thirftserver clean assembly/assembly thriftserver* the typo error is the evil. Stupid, me. I believe I just copy-pasted from somewhere else, but no even checked it, meanwhile no error ms

Re: Building Spark with hive does not work

2014-11-18 Thread Hao Ren
nvm, it would be better if correctness of flags could be checked by sbt during building. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Building-Spark-with-hive-does-not-work-tp19072p19183.html Sent from the Apache Spark User List mailing list archive at Na

Why is ALS class serializable ?

2014-11-19 Thread Hao Ren
Hi, When reading through ALS code, I find that: class ALS private ( private var numUserBlocks: Int, private var numProductBlocks: Int, private var rank: Int, private var iterations: Int, private var lambda: Double, private var implicitPrefs: Boolean, private var alpha:

Re: Understanding spark operation pipeline and block storage

2014-11-19 Thread Hao Ren
Anyone has idea on this ? Thx -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Understanding-spark-operation-pipeline-and-block-storage-tp18201p19263.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -

Re: Why is ALS class serializable ?

2014-11-21 Thread Hao Ren
It makes sense. Thx. =) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Why-is-ALS-class-serializable-tp19262p19472.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --

EC2 cluster with SSD ebs

2014-11-21 Thread Hao Ren
Hi, Is it possible to launch spark ec2 cluster with SSD ebs ? In the spark-ec2.py, we can only specify the ebs-size, ebs type is always normal(Magnetic) I am using Spark-1.1.0. Thank you. Hao -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/EC2-cluster-

Re: EC2 cluster with SSD ebs

2014-11-24 Thread Hao Ren
Hi, I found that the ec2 script has been improved a lot. And the option "ebs-vol-type" is added to specify ebs type. However, it seems that the option does not work, the cmd I used is the following: $SPARK_HOME/ec2/spark-ec2 -k sparkcv -i spark.pem -m r3.4xlarge -s 3 -t r3.2xlarge --ebs-vol-typ

scala.MatchError on SparkSQL when creating ArrayType of StructType

2014-12-05 Thread Hao Ren
Hi, I am using SparkSQL on 1.1.0 branch. The following code leads to a scala.MatchError at org.apache.spark.sql.catalyst.expressions.Cast.cast$lzycompute(Cast.scala:247) val scm = StructType(inputRDD.schema.fields.init :+ StructField("list", ArrayType( StructType(

registerTempTable: Table not found

2014-12-09 Thread Hao Ren
Hi, I am using Spark SQL on 1.2.1-snapshot. Here is problem I encountered. Bacially, I want to save a schemaRDD to HiveContext val scm = StructType( Seq( StructField("name", StringType, nullable = false), StructField("cnt", IntegerType, nullable = false) )) val schR

Re: registerTempTable: Table not found

2014-12-09 Thread Hao Ren
Thank you. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/registerTempTable-Table-not-found-tp20592p20594.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To

SchemaRDD.sample problem

2014-12-17 Thread Hao Ren
Hi, I am using SparkSQL on 1.2.1 branch. The problem comes froms the following 4-line code: *val t1: SchemaRDD = hiveContext hql "select * from product where is_new = 0" val tb1: SchemaRDD = t1.sample(withReplacement = false, fraction = 0.05) tb1.registerTempTable("t1_tmp") (hiveContext sql "sele

SparkSQL 1.2.1-snapshot Left Join problem

2014-12-17 Thread Hao Ren
Hi, When running SparkSQL branch 1.2.1 on EC2 standalone cluster, the following query does not work: create table debug as select v1.* from t1 as v1 left join t2 as v2 on v1.sku = v2.sku where v2.sku is null Both t1 and t2 have 200 partitions. t1 has 10k rows, and t2 has 4k rows. this query bl

MLlib, classification label problem

2014-12-22 Thread Hao Ren
Hi, When going through the MLlib doc for classification: http://spark.apache.org/docs/latest/mllib-linear-methods.html, I find that the loss functions are based on label {1, -1}. But in MLlib, the loss functions on label {1, 0} are used. And there is a dataValidation check before fitting, if a la

Re: SchemaRDD.sample problem

2014-12-23 Thread Hao Ren
update: t1 is good. After collecting on t1, I find that all row is ok (is_new = 0) Just after sampling, there are some rows where is_new = 1 which should have been filtered by Where clause. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SchemaRDD-sample-

Re: SchemaRDD.sample problem

2014-12-23 Thread Hao Ren
One observation is that: if fraction is big, say 50% - 80%, sampling is good, everything run as expected. But if fraction is small, for example, 5%, sampled data contains wrong rows which should have been filtered. The workaround is materializing t1 first: t1.cache t1.count These operations make

Re: removing first record from RDD[String]

2014-12-23 Thread Hao Ren
Hi, I guess you would like to remove the header of a CSV file. You can play with partitions. =) // src is your RDD val noHeader = src.mapPartitionsWithIndex( (i, iterator) => if (i == 0 && iterator.hasNext) { iterator.next iterator } else iterator) Thus, you don't need to