Re: How to compile Spark with private build of Hadoop
I think you can establish your own maven repository and deploy your modified hadoop binary jar with your modified version number. Then you can add your repository in spark pom.xml and use mvn -Dhadoop.version= fightf...@163.com From: Lu, Yingqi Date: 2016-03-08 15:09 To: user@spark.apache.org Subject: How to compile Spark with private build of Hadoop Hi All, I am new to Spark and I have a question regarding to compile Spark. I modified trunk version of Hadoop source code. How can I compile Spark (standalone mode) with my modified version of Hadoop (HDFS, Hadoop-common and etc.)? Thanks a lot for your help! Thanks, Lucy
Re: RE: How to compile Spark with private build of Hadoop
Hi, there, You may try to use nexus to establish maven local repository. I think this link would be helpful. http://www.sonatype.org/nexus/2015/02/27/setup-local-nexus-repository-and-deploy-war-file-from-maven/ After you had done the repository, you may use maven-deploy-plugin to deploy your customized hadoop jar and relative pom.xml to nexus repository. Check the link for reference: https://books.sonatype.com/nexus-book/reference/staging-deployment.html fightf...@163.com From: Lu, Yingqi Date: 2016-03-08 15:23 To: fightf...@163.com; user Subject: RE: How to compile Spark with private build of Hadoop Thank you for the quick reply. I am very new to maven and always use the default settings. Can you please be a little more specific on the instructions? I think all the jar files from Hadoop build are located at Hadoop-3.0.0-SNAPSHOT/share/hadoop. Which ones I need to use to compile Spark and how can I change the pom.xml? Thanks, Lucy From: fightf...@163.com [mailto:fightf...@163.com] Sent: Monday, March 07, 2016 11:15 PM To: Lu, Yingqi ; user Subject: Re: How to compile Spark with private build of Hadoop I think you can establish your own maven repository and deploy your modified hadoop binary jar with your modified version number. Then you can add your repository in spark pom.xml and use mvn -Dhadoop.version= fightf...@163.com From: Lu, Yingqi Date: 2016-03-08 15:09 To: user@spark.apache.org Subject: How to compile Spark with private build of Hadoop Hi All, I am new to Spark and I have a question regarding to compile Spark. I modified trunk version of Hadoop source code. How can I compile Spark (standalone mode) with my modified version of Hadoop (HDFS, Hadoop-common and etc.)? Thanks a lot for your help! Thanks, Lucy
Re: RE: Spark assembly in Maven repo?
Using maven to download the assembly jar is fine. I would recommend to deploy this assembly jar to your local maven repo, i.e. nexus repo, Or more likey a snapshot repository fightf...@163.com From: Xiaoyong Zhu Date: 2015-12-11 15:10 To: Jeff Zhang CC: user@spark.apache.org; Zhaomin Xu; Joe Zhang (SDE) Subject: RE: Spark assembly in Maven repo? Sorry – I didn’t make it clear. It’s actually not a “dependency” – it’s actually that we are building a certain plugin for IntelliJ where we want to distribute this jar. But since the jar is updated frequently we don't want to distribute it together with our plugin but we would like to download it via Maven. In this case what’s the recommended way? Xiaoyong From: Jeff Zhang [mailto:zjf...@gmail.com] Sent: Thursday, December 10, 2015 11:03 PM To: Xiaoyong Zhu Cc: user@spark.apache.org Subject: Re: Spark assembly in Maven repo? I don't think make the assembly jar as dependency a good practice. You may meet jar hell issue in that case. On Fri, Dec 11, 2015 at 2:46 PM, Xiaoyong Zhu wrote: Hi Experts, We have a project which has a dependency for the following jar spark-assembly--hadoop.jar for example: spark-assembly-1.4.1.2.3.3.0-2983-hadoop2.7.1.2.3.3.0-2983.jar since this assembly might be updated in the future, I am not sure if there is a Maven repo that has the above spark assembly jar? Or should we create & upload it to Maven central? Thanks! Xiaoyong -- Best Regards Jeff Zhang
Re: Re: Spark assembly in Maven repo?
Agree with you that assembly jar is not good to publish. However, what he really need is to fetch an updatable maven jar file. fightf...@163.com From: Mark Hamstra Date: 2015-12-11 15:34 To: fightf...@163.com CC: Xiaoyong Zhu; Jeff Zhang; user; Zhaomin Xu; Joe Zhang (SDE) Subject: Re: RE: Spark assembly in Maven repo? No, publishing a spark assembly jar is not fine. See the doc attached to https://issues.apache.org/jira/browse/SPARK-11157 and be aware that a likely goal of Spark 2.0 will be the elimination of assemblies. On Thu, Dec 10, 2015 at 11:19 PM, fightf...@163.com wrote: Using maven to download the assembly jar is fine. I would recommend to deploy this assembly jar to your local maven repo, i.e. nexus repo, Or more likey a snapshot repository fightf...@163.com From: Xiaoyong Zhu Date: 2015-12-11 15:10 To: Jeff Zhang CC: user@spark.apache.org; Zhaomin Xu; Joe Zhang (SDE) Subject: RE: Spark assembly in Maven repo? Sorry – I didn’t make it clear. It’s actually not a “dependency” – it’s actually that we are building a certain plugin for IntelliJ where we want to distribute this jar. But since the jar is updated frequently we don't want to distribute it together with our plugin but we would like to download it via Maven. In this case what’s the recommended way? Xiaoyong From: Jeff Zhang [mailto:zjf...@gmail.com] Sent: Thursday, December 10, 2015 11:03 PM To: Xiaoyong Zhu Cc: user@spark.apache.org Subject: Re: Spark assembly in Maven repo? I don't think make the assembly jar as dependency a good practice. You may meet jar hell issue in that case. On Fri, Dec 11, 2015 at 2:46 PM, Xiaoyong Zhu wrote: Hi Experts, We have a project which has a dependency for the following jar spark-assembly--hadoop.jar for example: spark-assembly-1.4.1.2.3.3.0-2983-hadoop2.7.1.2.3.3.0-2983.jar since this assembly might be updated in the future, I am not sure if there is a Maven repo that has the above spark assembly jar? Or should we create & upload it to Maven central? Thanks! Xiaoyong -- Best Regards Jeff Zhang
回复: How can I get the column data based on specific column name and then stored these data in array or list ?
Emm...I think you can do a df.map and store each column value to your list. fightf...@163.com 发件人: zml张明磊 发送时间: 2015-12-25 15:33 收件人: user@spark.apache.org 抄送: dev-subscr...@spark.apache.org 主题: How can I get the column data based on specific column name and then stored these data in array or list ? Hi, I am a new to Scala and Spark and trying to find relative API in DataFrame to solve my problem as title described. However, I just only find this API DataFrame.col(colName : String) : Column which returns an object of Column. Not the content. If only DataFrame support such API which like Column.toArray : Type is enough for me. But now, it doesn’t. How can I do can achieve this function ? Thanks, Minglei.
Re: Spark 1.5.2 compatible spark-cassandra-connector
Hi, Vivek M I had ever tried 1.5.x spark-cassandra connector and indeed encounter some classpath issues, mainly for the guaua dependency. I believe that can be solved by some maven config, but have not tried that yet. Best, Sun. fightf...@163.com From: vivek.meghanat...@wipro.com Date: 2015-12-29 20:40 To: user@spark.apache.org Subject: Spark 1.5.2 compatible spark-cassandra-connector All, What is the compatible spark-cassandra-connector for spark 1.5.2? I can only find the latest connector version spark-cassandra-connector_2.10-1.5.0-M3 which has dependency with 1.5.1 spark. Can we use the same for 1.5.2? Any classpath issues needs to be handled or any jars needs to be excluded while packaging the application jar? http://central.maven.org/maven2/com/datastax/spark/spark-cassandra-connector_2.10/1.5.0-M3/spark-cassandra-connector_2.10-1.5.0-M3.pom Regards, Vivek M The information contained in this electronic message and any attachments to this message are intended for the exclusive use of the addressee(s) and may contain proprietary, confidential or privileged information. If you are not the intended recipient, you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately and destroy all copies of this message and any attachments. WARNING: Computer viruses can be transmitted via email. The recipient should check this email and any attachments for the presence of viruses. The company accepts no liability for any damage caused by any virus transmitted by this email. www.wipro.com
spark dataframe read large mysql table running super slow
Hi, Recently I am planning to use spark sql to run some tests over large mysql datatable, and trying to compare the performance between spark and mycat. However, the load is super slow and hope someone can help tune on this. Environment: Spark 1.4.1 Code snipet: val prop = new java.util.Properties prop.setProperty("user","root") prop.setProperty("password", "123456") val url1 = "jdbc:mysql://localhost:3306/db1" val jdbcDF = sqlContext.read.jdbc(url1,"video",prop) jdbcDF.registerTempTable("video_test") sqlContext.sql("select count(1) from video_test").show() Overally the load process would stuck and get connection timeout. Mysql table hold about 100 million records. Would be happy to provide more usable info. Best, Sun. fightf...@163.com
spark dataframe jdbc read/write using dbcp connection pool
Hi , I want to load really large volumn datasets from mysql using spark dataframe api. And then save as parquet file or orc file to facilitate that with hive / Impala. The datasets size is about 1 billion records and when I am using the following naive code to run that , Error occurs and executor lost failure. val prop = new java.util.Properties prop.setProperty("user","test") prop.setProperty("password", "test") val url1 = "jdbc:mysql://172.16.54.136:3306/db1" val url2 = "jdbc:mysql://172.16.54.138:3306/db1" val jdbcDF1 = sqlContext.read.jdbc(url1,"video",prop) val jdbcDF2 = sqlContext.read.jdbc(url2,"video",prop) val jdbcDF3 = jdbcDF1.unionAll(jdbcDF2) jdbcDF3.write.format("parquet").save("hdfs://172.16.54.138:8020/perf") I can see from the executor log and the message is like the following. I can see from the log that the wait_timeout threshold reached and there is no retry mechanism in the code process. So I am asking you experts to help on tuning this. Or should I try to use a jdbc connection pool to increase parallelism ? 16/01/19 17:04:28 ERROR executor.Executor: Exception in task 0.0 in stage 0.0 (TID 0) com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure The last packet successfully received from the server was 377,769 milliseconds ago. The last packet sent successfully to the server was 377,790 milliseconds ago. at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) Caused by: java.io.EOFException: Can not read response from server. Expected to read 4 bytes, read 1 bytes before connection was unexpectedly lost. at com.mysql.jdbc.MysqlIO.readFully(MysqlIO.java:2914) at com.mysql.jdbc.MysqlIO.nextRowFast(MysqlIO.java:1996) ... 22 more 16/01/19 17:10:47 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 4 16/01/19 17:10:47 INFO jdbc.JDBCRDD: closed connection 16/01/19 17:10:47 ERROR executor.Executor: Exception in task 1.1 in stage 0.0 (TID 2) com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure fightf...@163.com
Re: Re: spark dataframe jdbc read/write using dbcp connection pool
Hi, Thanks a lot for your suggestion. I then tried the following code : val prop = new java.util.Properties prop.setProperty("user","test") prop.setProperty("password", "test") prop.setProperty("partitionColumn", "added_year") prop.setProperty("lowerBound", "1985") prop.setProperty("upperBound","2015") prop.setProperty("numPartitions", "200") val url1 = "jdbc:mysql://172.16.54.136:3306/db1" val url2 = "jdbc:mysql://172.16.54.138:3306/db1" val jdbcDF1 = sqlContext.read.jdbc(url1,"video3",prop) val jdbcDF2 = sqlContext.read.jdbc(url2,"video3",prop) val jdbcDF3 = jdbcDF1.unionAll(jdbcDF2) jdbcDF3.write.format("parquet").save("hdfs://172.16.54.138:8020/perf4") The added_year column in mysql table contains range of (1985-2015), and I pass the numPartitions property to get the partition purpose. Is this what you recommend ? Can you advice a little more implementation on this ? Best, Sun. fightf...@163.com From: 刘虓 Date: 2016-01-20 11:26 To: fightf...@163.com CC: user Subject: Re: spark dataframe jdbc read/write using dbcp connection pool Hi, I suggest you partition the JDBC reading on a indexed column of the mysql table 2016-01-20 10:11 GMT+08:00 fightf...@163.com : Hi , I want to load really large volumn datasets from mysql using spark dataframe api. And then save as parquet file or orc file to facilitate that with hive / Impala. The datasets size is about 1 billion records and when I am using the following naive code to run that , Error occurs and executor lost failure. val prop = new java.util.Properties prop.setProperty("user","test") prop.setProperty("password", "test") val url1 = "jdbc:mysql://172.16.54.136:3306/db1" val url2 = "jdbc:mysql://172.16.54.138:3306/db1" val jdbcDF1 = sqlContext.read.jdbc(url1,"video",prop) val jdbcDF2 = sqlContext.read.jdbc(url2,"video",prop) val jdbcDF3 = jdbcDF1.unionAll(jdbcDF2) jdbcDF3.write.format("parquet").save("hdfs://172.16.54.138:8020/perf") I can see from the executor log and the message is like the following. I can see from the log that the wait_timeout threshold reached and there is no retry mechanism in the code process. So I am asking you experts to help on tuning this. Or should I try to use a jdbc connection pool to increase parallelism ? 16/01/19 17:04:28 ERROR executor.Executor: Exception in task 0.0 in stage 0.0 (TID 0) com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure The last packet successfully received from the server was 377,769 milliseconds ago. The last packet sent successfully to the server was 377,790 milliseconds ago. at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) Caused by: java.io.EOFException: Can not read response from server. Expected to read 4 bytes, read 1 bytes before connection was unexpectedly lost. at com.mysql.jdbc.MysqlIO.readFully(MysqlIO.java:2914) at com.mysql.jdbc.MysqlIO.nextRowFast(MysqlIO.java:1996) ... 22 more 16/01/19 17:10:47 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 4 16/01/19 17:10:47 INFO jdbc.JDBCRDD: closed connection 16/01/19 17:10:47 ERROR executor.Executor: Exception in task 1.1 in stage 0.0 (TID 2) com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure fightf...@163.com
Re: Re: spark dataframe jdbc read/write using dbcp connection pool
OK. I am trying to use the jdbc read datasource with predicate like the following : sqlContext.read.jdbc(url, table, Array("foo = 1", "foo = 3"), props) I can see that the task goes to 62 partitions. But I still get exception and the parquet file did not write successfully. Do I need to increase the partitions? Or is there any other alternatives I can choose to tune this ? Best, Sun. fightf...@163.com From: fightf...@163.com Date: 2016-01-20 15:06 To: 刘虓 CC: user Subject: Re: Re: spark dataframe jdbc read/write using dbcp connection pool Hi, Thanks a lot for your suggestion. I then tried the following code : val prop = new java.util.Properties prop.setProperty("user","test") prop.setProperty("password", "test") prop.setProperty("partitionColumn", "added_year") prop.setProperty("lowerBound", "1985") prop.setProperty("upperBound","2015") prop.setProperty("numPartitions", "200") val url1 = "jdbc:mysql://172.16.54.136:3306/db1" val url2 = "jdbc:mysql://172.16.54.138:3306/db1" val jdbcDF1 = sqlContext.read.jdbc(url1,"video3",prop) val jdbcDF2 = sqlContext.read.jdbc(url2,"video3",prop) val jdbcDF3 = jdbcDF1.unionAll(jdbcDF2) jdbcDF3.write.format("parquet").save("hdfs://172.16.54.138:8020/perf4") The added_year column in mysql table contains range of (1985-2015), and I pass the numPartitions property to get the partition purpose. Is this what you recommend ? Can you advice a little more implementation on this ? Best, Sun. fightf...@163.com From: 刘虓 Date: 2016-01-20 11:26 To: fightf...@163.com CC: user Subject: Re: spark dataframe jdbc read/write using dbcp connection pool Hi, I suggest you partition the JDBC reading on a indexed column of the mysql table 2016-01-20 10:11 GMT+08:00 fightf...@163.com : Hi , I want to load really large volumn datasets from mysql using spark dataframe api. And then save as parquet file or orc file to facilitate that with hive / Impala. The datasets size is about 1 billion records and when I am using the following naive code to run that , Error occurs and executor lost failure. val prop = new java.util.Properties prop.setProperty("user","test") prop.setProperty("password", "test") val url1 = "jdbc:mysql://172.16.54.136:3306/db1" val url2 = "jdbc:mysql://172.16.54.138:3306/db1" val jdbcDF1 = sqlContext.read.jdbc(url1,"video",prop) val jdbcDF2 = sqlContext.read.jdbc(url2,"video",prop) val jdbcDF3 = jdbcDF1.unionAll(jdbcDF2) jdbcDF3.write.format("parquet").save("hdfs://172.16.54.138:8020/perf") I can see from the executor log and the message is like the following. I can see from the log that the wait_timeout threshold reached and there is no retry mechanism in the code process. So I am asking you experts to help on tuning this. Or should I try to use a jdbc connection pool to increase parallelism ? 16/01/19 17:04:28 ERROR executor.Executor: Exception in task 0.0 in stage 0.0 (TID 0) com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure The last packet successfully received from the server was 377,769 milliseconds ago. The last packet sent successfully to the server was 377,790 milliseconds ago. at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) Caused by: java.io.EOFException: Can not read response from server. Expected to read 4 bytes, read 1 bytes before connection was unexpectedly lost. at com.mysql.jdbc.MysqlIO.readFully(MysqlIO.java:2914) at com.mysql.jdbc.MysqlIO.nextRowFast(MysqlIO.java:1996) ... 22 more 16/01/19 17:10:47 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 4 16/01/19 17:10:47 INFO jdbc.JDBCRDD: closed connection 16/01/19 17:10:47 ERROR executor.Executor: Exception in task 1.1 in stage 0.0 (TID 2) com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure fightf...@163.com
Re: Re: spark dataframe jdbc read/write using dbcp connection pool
OK. I see there actually goes more partitions when I use predicate from the spark job ui. But each task then failed with the same error message : com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure The last packet successfully received from the server was 377,769 milliseconds ago. The last packet sent successfully to the server was 377,790 milliseconds ago. Do I need to increase the partitions ? Or shall I write parquet file for each partition in a iterable way ? Thanks a lot for your advice. Best, Sun. fightf...@163.com From: 刘虓 Date: 2016-01-20 18:31 To: fightf...@163.com CC: user Subject: Re: Re: spark dataframe jdbc read/write using dbcp connection pool Hi, I think you can view the spark job ui to find out whether the partition works or not,pay attention to the storage page to the partition size and which stage / task fails 2016-01-20 16:25 GMT+08:00 fightf...@163.com : OK. I am trying to use the jdbc read datasource with predicate like the following : sqlContext.read.jdbc(url, table, Array("foo = 1", "foo = 3"), props) I can see that the task goes to 62 partitions. But I still get exception and the parquet file did not write successfully. Do I need to increase the partitions? Or is there any other alternatives I can choose to tune this ? Best, Sun. fightf...@163.com From: fightf...@163.com Date: 2016-01-20 15:06 To: 刘虓 CC: user Subject: Re: Re: spark dataframe jdbc read/write using dbcp connection pool Hi, Thanks a lot for your suggestion. I then tried the following code : val prop = new java.util.Properties prop.setProperty("user","test") prop.setProperty("password", "test") prop.setProperty("partitionColumn", "added_year") prop.setProperty("lowerBound", "1985") prop.setProperty("upperBound","2015") prop.setProperty("numPartitions", "200") val url1 = "jdbc:mysql://172.16.54.136:3306/db1" val url2 = "jdbc:mysql://172.16.54.138:3306/db1" val jdbcDF1 = sqlContext.read.jdbc(url1,"video3",prop) val jdbcDF2 = sqlContext.read.jdbc(url2,"video3",prop) val jdbcDF3 = jdbcDF1.unionAll(jdbcDF2) jdbcDF3.write.format("parquet").save("hdfs://172.16.54.138:8020/perf4") The added_year column in mysql table contains range of (1985-2015), and I pass the numPartitions property to get the partition purpose. Is this what you recommend ? Can you advice a little more implementation on this ? Best, Sun. fightf...@163.com From: 刘虓 Date: 2016-01-20 11:26 To: fightf...@163.com CC: user Subject: Re: spark dataframe jdbc read/write using dbcp connection pool Hi, I suggest you partition the JDBC reading on a indexed column of the mysql table 2016-01-20 10:11 GMT+08:00 fightf...@163.com : Hi , I want to load really large volumn datasets from mysql using spark dataframe api. And then save as parquet file or orc file to facilitate that with hive / Impala. The datasets size is about 1 billion records and when I am using the following naive code to run that , Error occurs and executor lost failure. val prop = new java.util.Properties prop.setProperty("user","test") prop.setProperty("password", "test") val url1 = "jdbc:mysql://172.16.54.136:3306/db1" val url2 = "jdbc:mysql://172.16.54.138:3306/db1" val jdbcDF1 = sqlContext.read.jdbc(url1,"video",prop) val jdbcDF2 = sqlContext.read.jdbc(url2,"video",prop) val jdbcDF3 = jdbcDF1.unionAll(jdbcDF2) jdbcDF3.write.format("parquet").save("hdfs://172.16.54.138:8020/perf") I can see from the executor log and the message is like the following. I can see from the log that the wait_timeout threshold reached and there is no retry mechanism in the code process. So I am asking you experts to help on tuning this. Or should I try to use a jdbc connection pool to increase parallelism ? 16/01/19 17:04:28 ERROR executor.Executor: Exception in task 0.0 in stage 0.0 (TID 0) com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure The last packet successfully received from the server was 377,769 milliseconds ago. The last packet sent successfully to the server was 377,790 milliseconds ago. at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) Caused by: java.io.EOFException: Can not read response from server. Expected to read 4 bytes, read 1 bytes before connection was unexpectedly lost. at com.mysql.jdbc.MysqlIO.readFully(MysqlIO.java:2914) at com.mysql.jdbc.MysqlIO.nextRowFast(MysqlIO.java:1996) ... 22 more 16/01/19 17:10:47 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 4 16/01/19 17:10:47 INFO jdbc.JDBCRDD: closed connection 16/01/19 17:10:47 ERROR executor.Executor: Exception in task 1.1 in stage 0.0 (TID 2) com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure fightf...@163.com
clear cache using spark sql cli
Hi, How could I clear cache (execute sql query without any cache) using spark sql cli ? Is there any command available ? Best, Sun. fightf...@163.com
Re: Re: clear cache using spark sql cli
Hi, Ted Yes. I had seen that issue. But it seems that in spark-sql cli cannot do command like : sqlContext.clearCache() Is this right ? In spark-sql cli I can only run some sql queries. So I want to see if there are any available options to reach this. Best, Sun. fightf...@163.com From: Ted Yu Date: 2016-02-04 11:22 To: fightf...@163.com CC: user Subject: Re: clear cache using spark sql cli Have you looked at SPARK-5909 Add a clearCache command to Spark SQL's cache manager On Wed, Feb 3, 2016 at 7:16 PM, fightf...@163.com wrote: Hi, How could I clear cache (execute sql query without any cache) using spark sql cli ? Is there any command available ? Best, Sun. fightf...@163.com
Re: Re: clear cache using spark sql cli
No. That is not my case. Actually I am running spark-sql , which is in spark-sql cli mode, and execute sql queries against my hive tables. In spark-sql cli, there seems no exsiting sqlContext or sparkContext, only I can run some select/create/insert/delete operations. Best, Sun. fightf...@163.com From: Ted Yu Date: 2016-02-04 11:49 To: fightf...@163.com CC: user Subject: Re: Re: clear cache using spark sql cli In spark-shell, I can do: scala> sqlContext.clearCache() Is that not the case for you ? On Wed, Feb 3, 2016 at 7:35 PM, fightf...@163.com wrote: Hi, Ted Yes. I had seen that issue. But it seems that in spark-sql cli cannot do command like : sqlContext.clearCache() Is this right ? In spark-sql cli I can only run some sql queries. So I want to see if there are any available options to reach this. Best, Sun. fightf...@163.com From: Ted Yu Date: 2016-02-04 11:22 To: fightf...@163.com CC: user Subject: Re: clear cache using spark sql cli Have you looked at SPARK-5909 Add a clearCache command to Spark SQL's cache manager On Wed, Feb 3, 2016 at 7:16 PM, fightf...@163.com wrote: Hi, How could I clear cache (execute sql query without any cache) using spark sql cli ? Is there any command available ? Best, Sun. fightf...@163.com
About cache table performance in spark sql
Hi, I want to make sure that the cache table indeed would accelerate sql queries. Here is one of my use case : impala table size : 24.59GB, no partitions, with about 1 billion+ rows. I use sqlContext.sql to run queries over this table and try to do cache and uncache command to see if there is any performance disparity. I ran the following query : select * from video1203 where id > 10 and id < 20 and added_year != 1989 I can see the following results : 1 If I did not run cache table and just ran sqlContext.sql(), I can see the above query run about 25 seconds. 2 If I firstly run sqlContext.cacheTable("video1203"), the query runs super slow and would cause driver OOM exception, but I can get final results with about running 9 minuts. Would any expert can explain this for me ? I can see that cacheTable cause OOM just because the in-memory columnar storage cannot hold the 24.59GB+ table size into memory. But why the performance is so different and even so bad ? Best, Sun. fightf...@163.com
Re: Re: About cache table performance in spark sql
Hi, Thanks a lot for your explaination. I know that the slow process mainly caused by GC pressure and I had understand this difference just from your advice. I had each executor memory with 6GB and try to cache table. I had 3 executors and finally I can see some info from the spark job ui storage, like the following: RDD Name Storage Level Cached Partitions Fraction Cached Size in Memory Size in ExternalBlockStore Size on Disk In-memory table video1203 Memory Deserialized 1x Replicated 251 100% 18.1 GB 0.0 B 23.6 GB I can see that spark sql try to cache data into memory. And when I ran the following queries over this table video1203, I can get fast response. Another thing that confused me is that the above data size (in memory and on Disk). I can see that the in memory data size is 18.1GB, which almost equals sum of my executor memory. But why the Disk size if 23.6GB? From impala I get the overall parquet file size if about 24.59GB. Would be good to had some correction on this. Best, Sun. fightf...@163.com From: Prabhu Joseph Date: 2016-02-04 14:35 To: fightf...@163.com CC: user Subject: Re: About cache table performance in spark sql Sun, When Executor don't have enough memory and if it tries to cache the data, it spends lot of time on GC and hence the job will be slow. Either, 1. We should allocate enough memory to cache all RDD and hence the job will complete fast Or 2. Don't use cache when there is not enough Executor memory. To check the GC time, use --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" while submitting the job and SPARK_WORKER_DIR will have sysout with GC. The sysout will show many "Full GC" happening when cache is used and executor does not have enough heap. Thanks, Prabhu Joseph On Thu, Feb 4, 2016 at 11:25 AM, fightf...@163.com wrote: Hi, I want to make sure that the cache table indeed would accelerate sql queries. Here is one of my use case : impala table size : 24.59GB, no partitions, with about 1 billion+ rows. I use sqlContext.sql to run queries over this table and try to do cache and uncache command to see if there is any performance disparity. I ran the following query : select * from video1203 where id > 10 and id < 20 and added_year != 1989 I can see the following results : 1 If I did not run cache table and just ran sqlContext.sql(), I can see the above query run about 25 seconds. 2 If I firstly run sqlContext.cacheTable("video1203"), the query runs super slow and would cause driver OOM exception, but I can get final results with about running 9 minuts. Would any expert can explain this for me ? I can see that cacheTable cause OOM just because the in-memory columnar storage cannot hold the 24.59GB+ table size into memory. But why the performance is so different and even so bad ? Best, Sun. fightf...@163.com
Re: Re: About cache table performance in spark sql
Oh, thanks. Make sense to me. Best, Sun. fightf...@163.com From: Takeshi Yamamuro Date: 2016-02-04 16:01 To: fightf...@163.com CC: user Subject: Re: Re: About cache table performance in spark sql Hi, Parquet data are column-wise and highly compressed, so the size of deserialized rows in spark could be bigger than that of parquet data on disk. That is, I think that 24.59GB of parquet data becomes (18.1GB + 23.6GB) data in spark. Yes, you know cached data in spark also are compressed by default though, spark uses simpler compression algorithms than parquet does and ISTM the compression ratios are typically worse than those of parquet. On Thu, Feb 4, 2016 at 3:16 PM, fightf...@163.com wrote: Hi, Thanks a lot for your explaination. I know that the slow process mainly caused by GC pressure and I had understand this difference just from your advice. I had each executor memory with 6GB and try to cache table. I had 3 executors and finally I can see some info from the spark job ui storage, like the following: RDD Name Storage Level Cached Partitions Fraction Cached Size in Memory Size in ExternalBlockStore Size on Disk In-memory table video1203 Memory Deserialized 1x Replicated 251 100% 18.1 GB 0.0 B 23.6 GB I can see that spark sql try to cache data into memory. And when I ran the following queries over this table video1203, I can get fast response. Another thing that confused me is that the above data size (in memory and on Disk). I can see that the in memory data size is 18.1GB, which almost equals sum of my executor memory. But why the Disk size if 23.6GB? From impala I get the overall parquet file size if about 24.59GB. Would be good to had some correction on this. Best, Sun. fightf...@163.com From: Prabhu Joseph Date: 2016-02-04 14:35 To: fightf...@163.com CC: user Subject: Re: About cache table performance in spark sql Sun, When Executor don't have enough memory and if it tries to cache the data, it spends lot of time on GC and hence the job will be slow. Either, 1. We should allocate enough memory to cache all RDD and hence the job will complete fast Or 2. Don't use cache when there is not enough Executor memory. To check the GC time, use --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" while submitting the job and SPARK_WORKER_DIR will have sysout with GC. The sysout will show many "Full GC" happening when cache is used and executor does not have enough heap. Thanks, Prabhu Joseph On Thu, Feb 4, 2016 at 11:25 AM, fightf...@163.com wrote: Hi, I want to make sure that the cache table indeed would accelerate sql queries. Here is one of my use case : impala table size : 24.59GB, no partitions, with about 1 billion+ rows. I use sqlContext.sql to run queries over this table and try to do cache and uncache command to see if there is any performance disparity. I ran the following query : select * from video1203 where id > 10 and id < 20 and added_year != 1989 I can see the following results : 1 If I did not run cache table and just ran sqlContext.sql(), I can see the above query run about 25 seconds. 2 If I firstly run sqlContext.cacheTable("video1203"), the query runs super slow and would cause driver OOM exception, but I can get final results with about running 9 minuts. Would any expert can explain this for me ? I can see that cacheTable cause OOM just because the in-memory columnar storage cannot hold the 24.59GB+ table size into memory. But why the performance is so different and even so bad ? Best, Sun. fightf...@163.com -- --- Takeshi Yamamuro
Re: spark 1.6 Not able to start spark
I think this may be some permission issue. Check your spark conf for hadoop related. fightf...@163.com From: Arunkumar Pillai Date: 2016-02-23 14:08 To: user Subject: spark 1.6 Not able to start spark Hi When i try to start spark-shell I'm getting following error Exception in thread "main" java.lang.RuntimeException: java.lang.reflect.InvocationTargetException at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:131) at org.apache.hadoop.security.Groups.(Groups.java:70) . What could be the reason for this error -- Thanks and Regards Arun
回复: spark to hbase
Hi I notice that you configured the following : configuration.set("hbase.master", "192.168.1:6"); Did you mistyped the host IP ? Best, Sun. fightf...@163.com 发件人: jinhong lu 发送时间: 2015-10-27 17:22 收件人: spark users 主题: spark to hbase Hi, I write my result to hdfs, it did well: val model = lines.map(pairFunction).groupByKey().flatMap(pairFlatMapFunction).aggregateByKey(new TrainFeature())(seqOp, combOp).values model.map(a => (a.toKey() + "\t" + a.totalCount + "\t" + a.positiveCount)).saveAsTextFile(modelDataPath); But when I want to write to hbase, the applicaton hung, no log, no response, just stay there, and nothing is written to hbase: val model = lines.map(pairFunction).groupByKey().flatMap(pairFlatMapFunction).aggregateByKey(new TrainFeature())(seqOp, combOp).values.foreach({ res => val configuration = HBaseConfiguration.create(); configuration.set("hbase.zookeeper.property.clientPort", "2181"); configuration.set("hbase.zookeeper.quorum", “192.168.1.66"); configuration.set("hbase.master", "192.168.1:6"); val hadmin = new HBaseAdmin(configuration); val table = new HTable(configuration, "ljh_test3"); var put = new Put(Bytes.toBytes(res.toKey())); put.add(Bytes.toBytes("f"), Bytes.toBytes("c"), Bytes.toBytes(res.totalCount + res.positiveCount)); table.put(put); table.flushCommits() }) And then I try to write som simple data to hbase, it did well too: sc.parallelize(Array(1,2,3,4)).foreach({ res => val configuration = HBaseConfiguration.create(); configuration.set("hbase.zookeeper.property.clientPort", "2181"); configuration.set("hbase.zookeeper.quorum", "192.168.1.66"); configuration.set("hbase.master", "192.168.1:6"); val hadmin = new HBaseAdmin(configuration); val table = new HTable(configuration, "ljh_test3"); var put = new Put(Bytes.toBytes(res)); put.add(Bytes.toBytes("f"), Bytes.toBytes("c"), Bytes.toBytes(res)); table.put(put); table.flushCommits() }) what is the problem with the 2rd code? thanks a lot.
OLAP query using spark dataframe with cassandra
Hi, community We are specially interested about this featural integration according to some slides from [1]. The SMACK(Spark+Mesos+Akka+Cassandra+Kafka) seems good implementation for lambda architecure in the open-source world, especially non-hadoop based cluster environment. As we can see, the advantages obviously consist of : 1 the feasibility and scalability of spark datafram api, which can also make a perfect complement for Apache Cassandra native cql feature. 2 both streaming and batch process availability using the ALL-STACK thing, cool. 3 we can both achieve compacity and usability for spark with cassandra, including seemlessly integrating with job scheduling and resource management. Only one concern goes to the OLAP query performance issue, which mainly caused by frequent aggregation work between daily increased large tables, for both spark sql and cassandra. I can see that the [1] use case facilitates FiloDB to achieve columnar storage and query performance, but we had nothing more knowledge. Question is : Any guy had such use case for now, especially using in your production environment ? Would be interested in your architeture for designing this OLAP engine using spark + cassandra. What do you think the comparison between the scenario with traditional OLAP cube design? Like Apache Kylin or pentaho mondrian ? Best Regards, Sun. [1] http://www.slideshare.net/planetcassandra/cassandra-summit-2014-interactive-olap-queries-using-apache-cassandra-and-spark fightf...@163.com
Re: Re: OLAP query using spark dataframe with cassandra
Hi, Thanks for suggesting. Actually we are now evaluating and stressing the spark sql on cassandra, while trying to define business models. FWIW, the solution mentioned here is different from traditional OLAP cube engine, right ? So we are hesitating on the common sense or direction choice of olap architecture. And we are happy to hear more use case from this community. Best, Sun. fightf...@163.com From: Jörn Franke Date: 2015-11-09 14:40 To: fightf...@163.com CC: user; dev Subject: Re: OLAP query using spark dataframe with cassandra Is there any distributor supporting these software components in combination? If no and your core business is not software then you may want to look for something else, because it might not make sense to build up internal know-how in all of these areas. In any case - it depends all highly on your data and queries. You will have to do your own experiments. On 09 Nov 2015, at 07:02, "fightf...@163.com" wrote: Hi, community We are specially interested about this featural integration according to some slides from [1]. The SMACK(Spark+Mesos+Akka+Cassandra+Kafka) seems good implementation for lambda architecure in the open-source world, especially non-hadoop based cluster environment. As we can see, the advantages obviously consist of : 1 the feasibility and scalability of spark datafram api, which can also make a perfect complement for Apache Cassandra native cql feature. 2 both streaming and batch process availability using the ALL-STACK thing, cool. 3 we can both achieve compacity and usability for spark with cassandra, including seemlessly integrating with job scheduling and resource management. Only one concern goes to the OLAP query performance issue, which mainly caused by frequent aggregation work between daily increased large tables, for both spark sql and cassandra. I can see that the [1] use case facilitates FiloDB to achieve columnar storage and query performance, but we had nothing more knowledge. Question is : Any guy had such use case for now, especially using in your production environment ? Would be interested in your architeture for designing this OLAP engine using spark + cassandra. What do you think the comparison between the scenario with traditional OLAP cube design? Like Apache Kylin or pentaho mondrian ? Best Regards, Sun. [1] http://www.slideshare.net/planetcassandra/cassandra-summit-2014-interactive-olap-queries-using-apache-cassandra-and-spark fightf...@163.com
Re: Re: OLAP query using spark dataframe with cassandra
Hi, According to my experience, I would recommend option 3) using Apache Kylin for your requirements. This is a suggestion based on the open-source world. For the per cassandra thing, I accept your advice for the special support thing. But the community is very open and convinient for prompt response. fightf...@163.com From: tsh Date: 2015-11-10 02:56 To: fightf...@163.com; user; dev Subject: Re: OLAP query using spark dataframe with cassandra Hi, I'm in the same position right now: we are going to implement something like OLAP BI + Machine Learning explorations on the same cluster. Well, the question is quite ambivalent: from one hand, we have terabytes of versatile data and the necessity to make something like cubes (Hive and Hive on HBase are unsatisfactory). From the other, our users get accustomed to Tableau + Vertica. So, right now I consider the following choices: 1) Platfora (not free, I don't know price right now) + Spark 2) AtScale + Tableau(not free, I don't know price right now) + Spark 3) Apache Kylin (young project?) + Spark on YARN + Kafka + Flume + some storage 4) Apache Phoenix + Apache HBase + Mondrian + Spark on YARN + Kafka + Flume (has somebody use it in production?) 5) Spark + Tableau (cubes?) For myself, I decided not to dive into Mesos. Cassandra is hardly configurable, you'll have to dedicate special employee to support it. I'll be glad to hear other ideas & propositions as we are at the beginning of the process too. Sincerely yours, Tim Shenkao On 11/09/2015 09:46 AM, fightf...@163.com wrote: Hi, Thanks for suggesting. Actually we are now evaluating and stressing the spark sql on cassandra, while trying to define business models. FWIW, the solution mentioned here is different from traditional OLAP cube engine, right ? So we are hesitating on the common sense or direction choice of olap architecture. And we are happy to hear more use case from this community. Best, Sun. fightf...@163.com From: Jörn Franke Date: 2015-11-09 14:40 To: fightf...@163.com CC: user; dev Subject: Re: OLAP query using spark dataframe with cassandra Is there any distributor supporting these software components in combination? If no and your core business is not software then you may want to look for something else, because it might not make sense to build up internal know-how in all of these areas. In any case - it depends all highly on your data and queries. You will have to do your own experiments. On 09 Nov 2015, at 07:02, "fightf...@163.com" wrote: Hi, community We are specially interested about this featural integration according to some slides from [1]. The SMACK(Spark+Mesos+Akka+Cassandra+Kafka) seems good implementation for lambda architecure in the open-source world, especially non-hadoop based cluster environment. As we can see, the advantages obviously consist of : 1 the feasibility and scalability of spark datafram api, which can also make a perfect complement for Apache Cassandra native cql feature. 2 both streaming and batch process availability using the ALL-STACK thing, cool. 3 we can both achieve compacity and usability for spark with cassandra, including seemlessly integrating with job scheduling and resource management. Only one concern goes to the OLAP query performance issue, which mainly caused by frequent aggregation work between daily increased large tables, for both spark sql and cassandra. I can see that the [1] use case facilitates FiloDB to achieve columnar storage and query performance, but we had nothing more knowledge. Question is : Any guy had such use case for now, especially using in your production environment ? Would be interested in your architeture for designing this OLAP engine using spark + cassandra. What do you think the comparison between the scenario with traditional OLAP cube design? Like Apache Kylin or pentaho mondrian ? Best Regards, Sun. [1] http://www.slideshare.net/planetcassandra/cassandra-summit-2014-interactive-olap-queries-using-apache-cassandra-and-spark fightf...@163.com
Re: Re: OLAP query using spark dataframe with cassandra
Hi, Have you ever considered cassandra as a replacement ? We are now almost the seem usage as your engine, e.g. using mysql to store initial aggregated data. Can you share more about your kind of Cube queries ? We are very interested in that arch too : ) Best, Sun. fightf...@163.com From: Andrés Ivaldi Date: 2015-11-10 07:03 To: tsh CC: fightf...@163.com; user; dev Subject: Re: OLAP query using spark dataframe with cassandra Hi, I'm also considering something similar, Spark plain is too slow for my case, a possible solution is use Spark as Multiple Source connector and basic transformation layer, then persist the information (actually is a RDBM), after that with our engine we build a kind of Cube queries, and the result is processed again by Spark adding Machine Learning. Our Missing part is reemplace the RDBM with something more suitable and scalable than RDBM, dont care about pre processing information if after pre processing the queries are fast. Regards On Mon, Nov 9, 2015 at 3:56 PM, tsh wrote: Hi, I'm in the same position right now: we are going to implement something like OLAP BI + Machine Learning explorations on the same cluster. Well, the question is quite ambivalent: from one hand, we have terabytes of versatile data and the necessity to make something like cubes (Hive and Hive on HBase are unsatisfactory). From the other, our users get accustomed to Tableau + Vertica. So, right now I consider the following choices: 1) Platfora (not free, I don't know price right now) + Spark 2) AtScale + Tableau(not free, I don't know price right now) + Spark 3) Apache Kylin (young project?) + Spark on YARN + Kafka + Flume + some storage 4) Apache Phoenix + Apache HBase + Mondrian + Spark on YARN + Kafka + Flume (has somebody use it in production?) 5) Spark + Tableau (cubes?) For myself, I decided not to dive into Mesos. Cassandra is hardly configurable, you'll have to dedicate special employee to support it. I'll be glad to hear other ideas & propositions as we are at the beginning of the process too. Sincerely yours, Tim Shenkao On 11/09/2015 09:46 AM, fightf...@163.com wrote: Hi, Thanks for suggesting. Actually we are now evaluating and stressing the spark sql on cassandra, while trying to define business models. FWIW, the solution mentioned here is different from traditional OLAP cube engine, right ? So we are hesitating on the common sense or direction choice of olap architecture. And we are happy to hear more use case from this community. Best, Sun. fightf...@163.com From: Jörn Franke Date: 2015-11-09 14:40 To: fightf...@163.com CC: user; dev Subject: Re: OLAP query using spark dataframe with cassandra Is there any distributor supporting these software components in combination? If no and your core business is not software then you may want to look for something else, because it might not make sense to build up internal know-how in all of these areas. In any case - it depends all highly on your data and queries. You will have to do your own experiments. On 09 Nov 2015, at 07:02, "fightf...@163.com" wrote: Hi, community We are specially interested about this featural integration according to some slides from [1]. The SMACK(Spark+Mesos+Akka+Cassandra+Kafka) seems good implementation for lambda architecure in the open-source world, especially non-hadoop based cluster environment. As we can see, the advantages obviously consist of : 1 the feasibility and scalability of spark datafram api, which can also make a perfect complement for Apache Cassandra native cql feature. 2 both streaming and batch process availability using the ALL-STACK thing, cool. 3 we can both achieve compacity and usability for spark with cassandra, including seemlessly integrating with job scheduling and resource management. Only one concern goes to the OLAP query performance issue, which mainly caused by frequent aggregation work between daily increased large tables, for both spark sql and cassandra. I can see that the [1] use case facilitates FiloDB to achieve columnar storage and query performance, but we had nothing more knowledge. Question is : Any guy had such use case for now, especially using in your production environment ? Would be interested in your architeture for designing this OLAP engine using spark + cassandra. What do you think the comparison between the scenario with traditional OLAP cube design? Like Apache Kylin or pentaho mondrian ? Best Regards, Sun. [1] http://www.slideshare.net/planetcassandra/cassandra-summit-2014-interactive-olap-queries-using-apache-cassandra-and-spark fightf...@163.com -- Ing. Ivaldi Andres
Re: Spark Thrift doesn't start
I think the exception info just says clear that you may miss some tez related jar on the spark thrift server classpath. fightf...@163.com From: DaeHyun Ryu Date: 2015-11-11 14:47 To: user Subject: Spark Thrift doesn't start Hi folks, I configured tez as execution engine of Hive. After done that, whenever I started spark thrift server, it just stopped automatically. I checked log and saw the following messages. My spark version is 1.4.1 and tez version is 0.7.0 (IBM BigInsights 4.1) Does anyone have any idea on this ? java.lang.NoClassDefFoundError: org/apache/tez/dag/api/SessionNotRunning at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:353) at org.apache.spark.sql.hive.client.ClientWrapper.(ClientWrapper.scala:116) at org.apache.spark.sql.hive.HiveContext.executionHive$lzycompute(HiveContext.scala:163) at org.apache.spark.sql.hive.HiveContext.executionHive(HiveContext.scala:161) at org.apache.spark.sql.hive.HiveContext.(HiveContext.scala:168) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:422) at org.apache.spark.repl.SparkILoop.createSQLContext(SparkILoop.scala:1028) at $iwC$$iwC.(:9) at $iwC.(:18) at (:20) at .(:24) at .() at .(:7) at .() at $print() at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814) at org.apache.spark.repl.SparkILoopInit$$anonfun$initializeSpark$1.apply(SparkILoopInit.scala:130) at org.apache.spark.repl.SparkILoopInit$$anonfun$initializeSpark$1.apply(SparkILoopInit.scala:122) at org.apache.spark.repl.SparkIMain.beQuietDuring(SparkIMain.scala:324) at org.apache.spark.repl.SparkILoopInit$class.initializeSpark(SparkILoopInit.scala:122) at org.apache.spark.repl.SparkILoop.initializeSpark(SparkILoop.scala:64) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1$$anonfun$apply$mcZ$sp$5.apply$mcV$sp(SparkILoop.scala:974) at org.apache.spark.repl.SparkILoopInit$class.runThunks(SparkILoopInit.scala:157) at org.apache.spark.repl.SparkILoop.runThunks(SparkILoop.scala:64) at org.apache.spark.repl.SparkILoopInit$class.postInitialization(SparkILoopInit.scala:106) at org.apache.spark.repl.SparkILoop.postInitialization(SparkILoop.scala:64) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:991) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scal
Re: error while creating HiveContext
Hi, I think you just want to put the hive-site.xml in the spark/conf directory and it would load it into spark classpath. Best, Sun. fightf...@163.com From: Chandra Mohan, Ananda Vel Murugan Date: 2015-11-27 15:04 To: user Subject: error while creating HiveContext Hi, I am building a spark-sql application in Java. I created a maven project in Eclipse and added all dependencies including spark-core and spark-sql. I am creating HiveContext in my spark program and then try to run sql queries against my Hive Table. When I submit this job in spark, for some reasons it is trying to create derby metastore. But my hive-site.xml clearly specifies the jdbc url of my MySQL . So I think my hive-site.xml is not getting picked by spark program. I specified hive-site.xml path using “—files” argument in spark-submit. I also tried placing hive-site.xml file in my jar . I even tried creating Configuration object with hive-site.xml path and updated my HiveContext by calling addResource() method. I want to know where I should put hive config files in my jar or in my eclipse project or in my cluster for it to be picked by correctly in my spark program. Thanks for any help. Regards, Anand.C
Re: RE: error while creating HiveContext
Could you provide your hive-site.xml file info ? Best, Sun. fightf...@163.com From: Chandra Mohan, Ananda Vel Murugan Date: 2015-11-27 17:04 To: fightf...@163.com; user Subject: RE: error while creating HiveContext Hi, I verified and I could see hive-site.xml in spark conf directory. Regards, Anand.C From: fightf...@163.com [mailto:fightf...@163.com] Sent: Friday, November 27, 2015 12:53 PM To: Chandra Mohan, Ananda Vel Murugan ; user Subject: Re: error while creating HiveContext Hi, I think you just want to put the hive-site.xml in the spark/conf directory and it would load it into spark classpath. Best, Sun. fightf...@163.com From: Chandra Mohan, Ananda Vel Murugan Date: 2015-11-27 15:04 To: user Subject: error while creating HiveContext Hi, I am building a spark-sql application in Java. I created a maven project in Eclipse and added all dependencies including spark-core and spark-sql. I am creating HiveContext in my spark program and then try to run sql queries against my Hive Table. When I submit this job in spark, for some reasons it is trying to create derby metastore. But my hive-site.xml clearly specifies the jdbc url of my MySQL . So I think my hive-site.xml is not getting picked by spark program. I specified hive-site.xml path using “—files” argument in spark-submit. I also tried placing hive-site.xml file in my jar . I even tried creating Configuration object with hive-site.xml path and updated my HiveContext by calling addResource() method. I want to know where I should put hive config files in my jar or in my eclipse project or in my cluster for it to be picked by correctly in my spark program. Thanks for any help. Regards, Anand.C
Re: New to Spark
Hi,there Which version spark in your use case ? You made hive metastore to be used by Spark, that mean you can run sql queries over the current hive tables , right ? Or you just use local hive metastore embeded in spark sql side ? I think you need to provide more info for your spark sql and hive config, that would help to locate root cause for the problem. Best, Sun. fightf...@163.com From: Ashok Kumar Date: 2015-12-01 18:54 To: user@spark.apache.org Subject: New to Spark Hi, I am new to Spark. I am trying to use spark-sql with SPARK CREATED and HIVE CREATED tables. I have successfully made Hive metastore to be used by Spark. In spark-sql I can see the DDL for Hive tables. However, when I do select count(1) from HIVE_TABLE it always returns zero rows. If I create a table in spark as create table SPARK_TABLE as select * from HIVE_TABLE, the table schema is created but no data. I can then use Hive to do INSET SELECT from HIVE_TABLE into SPARK_TABLE. That works. I can then use spark-sql to query the table. My questions: Is this correct that spark-sql only sees data in spark created tables but not any data in Hive tables? How can I make Spark read data from existing Hive tables. Thanks
spark sql cli query results written to file ?
HI, How could I save the spark sql cli running queries results and write the results to some local file ? Is there any available command ? Thanks, Sun. fightf...@163.com
Re: Re: spark sql cli query results written to file ?
Well , Sorry for late reponse and thanks a lot for pointing out the clue. fightf...@163.com From: Akhil Das Date: 2015-12-03 14:50 To: Sahil Sareen CC: fightf...@163.com; user Subject: Re: spark sql cli query results written to file ? Oops 3 mins late. :) Thanks Best Regards On Thu, Dec 3, 2015 at 11:49 AM, Sahil Sareen wrote: Yeah, Thats the example from the link I just posted. -Sahil On Thu, Dec 3, 2015 at 11:41 AM, Akhil Das wrote: Something like this? val df = sqlContext.read.load("examples/src/main/resources/users.parquet") df.select("name", "favorite_color").write.save("namesAndFavColors.parquet") It will save the name, favorite_color columns to a parquet file. You can read more information over here http://spark.apache.org/docs/latest/sql-programming-guide.html#save-modes Thanks Best Regards On Thu, Dec 3, 2015 at 11:35 AM, fightf...@163.com wrote: HI, How could I save the spark sql cli running queries results and write the results to some local file ? Is there any available command ? Thanks, Sun. fightf...@163.com
Re: About Spark On Hbase
Actually you can refer to https://github.com/cloudera-labs/SparkOnHBase Also, HBASE-13992 already integrates that feature into the hbase side, but that feature has not been released. Best, Sun. fightf...@163.com From: censj Date: 2015-12-09 15:04 To: user@spark.apache.org Subject: About Spark On Hbase hi all, now I using spark,but I not found spark operation hbase open source. Do any one tell me?
回复: Re: About Spark On Hbase
I don't think it really need CDH component. Just use the API fightf...@163.com 发件人: censj 发送时间: 2015-12-09 15:31 收件人: fightf...@163.com 抄送: user@spark.apache.org 主题: Re: About Spark On Hbase But this is dependent on CDH。I not install CDH。 在 2015年12月9日,15:18,fightf...@163.com 写道: Actually you can refer to https://github.com/cloudera-labs/SparkOnHBase Also, HBASE-13992 already integrates that feature into the hbase side, but that feature has not been released. Best, Sun. fightf...@163.com From: censj Date: 2015-12-09 15:04 To: user@spark.apache.org Subject: About Spark On Hbase hi all, now I using spark,but I not found spark operation hbase open source. Do any one tell me?
回复: Re: About Spark On Hbase
If you are using maven , you can add the cloudera maven repo to the repository in pom.xml and add the dependency of spark-hbase. I just found this : http://spark-packages.org/package/nerdammer/spark-hbase-connector as Feng Dongyu recommend, you can try this also, but I had no experience of using this. fightf...@163.com 发件人: censj 发送时间: 2015-12-09 15:44 收件人: fightf...@163.com 抄送: user@spark.apache.org 主题: Re: About Spark On Hbase So, I how to get this jar? I use set package project.I not found sbt lib. 在 2015年12月9日,15:42,fightf...@163.com 写道: I don't think it really need CDH component. Just use the API fightf...@163.com 发件人: censj 发送时间: 2015-12-09 15:31 收件人: fightf...@163.com 抄送: user@spark.apache.org 主题: Re: About Spark On Hbase But this is dependent on CDH。I not install CDH。 在 2015年12月9日,15:18,fightf...@163.com 写道: Actually you can refer to https://github.com/cloudera-labs/SparkOnHBase Also, HBASE-13992 already integrates that feature into the hbase side, but that feature has not been released. Best, Sun. fightf...@163.com From: censj Date: 2015-12-09 15:04 To: user@spark.apache.org Subject: About Spark On Hbase hi all, now I using spark,but I not found spark operation hbase open source. Do any one tell me?
count distinct in spark sql aggregation
Hi, I have a use case that need to get daily, weekly or monthly active users count according to the native hourly data, say as a large datasets. The native datasets are instantly updated and I want to get the distinct active user count per time dimension. Anyone can show some efficient way of reaching this ? If I want to get daily active distinct user count , I would get this day's each hour dataset and do some calculation ? My initial thought on this is to use a key value store and use a hashset to store the hourly userid. Then I can compare and distinct each hourly userid set and got the daily distinct count. However , I am not sure about this implementation can be some efficient workaround. Hope some guys can shed a little light on this. Best, Sun. fightf...@163.com
Re: Re: Need help in setting up spark cluster
Hi, there Per for your analytical and real time recommendations request, I would recommend you use spark sql and hive thriftserver to store and process your spark streaming data. As thriftserver would be run as a long-term application and it would be quite feasible to cyclely comsume data and provide some analytical requitements. On the other hand, hbase or cassandra would also be sufficient and I think you may want to integrate spark sql with hbase / cassandra for your data digesting. You could deploy a CDH or HDP platform to support your productive environment running. I suggest you firstly to deploy a spark standalone cluster to run some integration tests, and also you can consider running spark on yarn for the later development use cases. Best, Sun. fightf...@163.com From: Jeetendra Gangele Date: 2015-07-23 13:39 To: user Subject: Re: Need help in setting up spark cluster Can anybody help here? On 22 July 2015 at 10:38, Jeetendra Gangele wrote: Hi All, I am trying to capture the user activities for real estate portal. I am using RabbitMS and Spark streaming combination where all the Events I am pushing to RabbitMQ and then 1 secs micro job I am consuming using Spark streaming. Later on I am thinking to store the consumed data for analytics or near real time recommendations. Where should I store this data in Spark RDD itself and using SparkSQL people can query this data for analytics or real time recommendations, this data is not huge currently its 10 GB per day. Another alternatiove will be either Hbase or Cassandra, which one will be better? Any suggestions? Also for this use cases should I use any existing big data platform like hortonworks or I can deploy standalone spark cluster ?
Re: Functions in Spark SQL
Hi, there I test with sqlContext.sql(select funcName(param1,param2,...) from tableName ) just worked fine. Would you like to paste your test code here ? And which version of Spark are u using ? Best, Sun. fightf...@163.com From: vinod kumar Date: 2015-07-27 15:04 To: User Subject: Functions in Spark SQL Hi, May I know how to use the functions mentioned in http://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.sql.functions$ in spark sql? when I use like "Select last(column) from tablename" I am getting error like 15/07/27 03:00:00 INFO exec.FunctionRegistry: Unable to lookup UDF in metastore: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:NoSuchO bjectException(message:Function default.last does not exist)) java.lang.RuntimeException: Couldn't find function last Thanks, Vinod
Re: PermGen Space Error
Hi, Sarath Did you try to use and increase spark.excecutor.extraJaveOptions -XX:PermSize= -XX:MaxPermSize= fightf...@163.com From: Sarath Chandra Date: 2015-07-29 17:39 To: user@spark.apache.org Subject: PermGen Space Error Dear All, I'm using - => Spark 1.2.0 => Hive 0.13.1 => Mesos 0.18.1 => Spring => JDK 1.7 I've written a scala program which => instantiates a spark and hive context => parses an XML file which provides the where clauses for queries => generates full fledged hive queries to be run on hive tables => registers obtained SchemaRDD as temp tables to get reduced data sets to be queried further => prints the count of finally obtained data set I'm running this scala programatically through java command (command invokes a controller program to create some useful value objects using input parameters and properties files and then calls the above scala program). I'm getting PermGen Space error when it hits the last line to print the count. I'm printing to console the generated hive queries from the scala program. When I run the same from a spark shell it works fine. As mentioned in some posts and blogs I tried using the option spark.driver.extraJavaOptions to increase the size, tried with 256 and 512 but still no luck. Please help me in resolving the space issue Thanks & Regards, Sarath.
Spark standalone/Mesos on top of Ceph
Hi guys, Here is the info for Ceph : http://ceph.com/ We are investigating and using Ceph for distributed storage and monitoring, specifically interested in using Ceph as the underlied file system storage for spark. However, we had no experience for achiveing that. Any body has seen such progress ? Best, Sun. fightf...@163.com
Re: Re: Spark standalone/Mesos on top of Ceph
Hi Jerry Yeah, we managed to run and use ceph already in our few production environment, especially with OpenStack. The reason we want to use Ceph is that we aim to look for some workarounds for unified storage layer and the design concepts of ceph is quite catching. I am just interested in such work like the hadoop cephfs plugin and recently we are going to do some benchmark tests between HDFS and cephfs. So the ongoing progress would be benificial if some related work between Apache Spark and Ceph could dedicate some thoughful insights. BTW, for the Ceph Object Gateway s3 rest api, agreed for such inconvinience and some incompobilities. However, we had not yet quite researched and tested over radosgw a lot. But we had some little requirements using gw in some use cases. Hope for more considerations and talks. Best, Sun. fightf...@163.com From: Jerry Lam Date: 2015-09-23 09:37 To: fightf...@163.com CC: user Subject: Re: Spark standalone/Mesos on top of Ceph Do you have specific reasons to use Ceph? I used Ceph before, I'm not too in love with it especially when I was using the Ceph Object Gateway S3 API. There are some incompatibilities with aws s3 api. You really really need to try it because making the commitment. Did you managed to install it? On Tue, Sep 22, 2015 at 9:28 PM, fightf...@163.com wrote: Hi guys, Here is the info for Ceph : http://ceph.com/ We are investigating and using Ceph for distributed storage and monitoring, specifically interested in using Ceph as the underlied file system storage for spark. However, we had no experience for achiveing that. Any body has seen such progress ? Best, Sun. fightf...@163.com
Re: Re: Bulk loading into hbase using saveAsNewAPIHadoopFile
Hi, Jim Your generated rdd should be the type of RDD[ImmutableBytesWritable, KeyValue], while your current type goes to RDD[ImmutableBytesWritable, Put]. You can go like this and the result should be type of RDD[ImmutableBytesWritable, KeyValue] that can be savaAsNewHadoopFile val result = num.flatMap ( v=> { keyValueBuilder(v).map(v => (v,1)) }).map(v => ( new ImmutableBytesWritable(v._1.getBuffer(), v._1.getRowOffset(), v._1.getRowLength()),v._1)) where keyValueBuider would be defined as RDD[T] => RDD[List[KeyValue]], for example, you can go: val keyValueBuilder = (data: (Int, Int)) =>{ val rowkeyBytes = Bytes.toBytes(data._1) val colfam = Bytes.toBytes("cf") val qual = Bytes.toBytes("c1") val value = Bytes.toBytes("val_xxx") val kv = new KeyValue(rowkeyBytes,colfam,qual,value) List(kv) } Thanks, Sun fightf...@163.com From: Jim Green Date: 2015-01-28 04:44 To: Ted Yu CC: user Subject: Re: Bulk loading into hbase using saveAsNewAPIHadoopFile I used below code, and it still failed with the same error. Anyone has experience on bulk loading using scala? Thanks. import org.apache.spark._ import org.apache.spark.rdd.NewHadoopRDD import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor} import org.apache.hadoop.hbase.client.HBaseAdmin import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HColumnDescriptor import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop.hbase.KeyValue import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat val conf = HBaseConfiguration.create() val tableName = "t1" val table = new HTable(conf, tableName) conf.set(TableOutputFormat.OUTPUT_TABLE, tableName) val job = Job.getInstance(conf) job.setMapOutputKeyClass (classOf[ImmutableBytesWritable]) job.setMapOutputValueClass (classOf[KeyValue]) HFileOutputFormat.configureIncrementalLoad (job, table) val num = sc.parallelize(1 to 10) val rdd = num.map(x=>{ val put: Put = new Put(Bytes.toBytes(x)) put.add("cf".getBytes(), "c1".getBytes(), ("value_xxx").getBytes()) (new ImmutableBytesWritable(Bytes.toBytes(x)), put) }) rdd.saveAsNewAPIHadoopFile("/tmp/13", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat], conf) On Tue, Jan 27, 2015 at 12:17 PM, Jim Green wrote: Thanks Ted. Could you give me a simple example to load one row data in hbase? How should I generate the KeyValue? I tried multiple times, and still can not figure it out. On Tue, Jan 27, 2015 at 12:10 PM, Ted Yu wrote: Here is the method signature used by HFileOutputFormat : public void write(ImmutableBytesWritable row, KeyValue kv) Meaning, KeyValue is expected, not Put. On Tue, Jan 27, 2015 at 10:54 AM, Jim Green wrote: Hi Team, I need some help on writing a scala to bulk load some data into hbase. Env: hbase 0.94 spark-1.0.2 I am trying below code to just bulk load some data into hbase table “t1”. import org.apache.spark._ import org.apache.spark.rdd.NewHadoopRDD import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor} import org.apache.hadoop.hbase.client.HBaseAdmin import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HColumnDescriptor import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop.hbase.KeyValue import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat val conf = HBaseConfiguration.create() val tableName = "t1" val table = new HTable(conf, tableName) conf.set(TableOutputFormat.OUTPUT_TABLE, tableName) val job = Job.getInstance(conf) job.setMapOutputKeyClass (classOf[ImmutableBytesWritable]) job.setMapOutputValueClass (classOf[KeyValue]) HFileOutputFormat.configureIncrementalLoad (job, table) val num = sc.parallelize(1 to 10) val rdd = num.map(x=>{ val put: Put = new Put(Bytes.toBytes(x)) put.add("cf".getBytes(), "c1".getBy
Re: Hi: hadoop 2.5 for spark
Hi, Siddharth You can re build spark with maven by specifying -Dhadoop.version=2.5.0 Thanks, Sun. fightf...@163.com From: Siddharth Ubale Date: 2015-01-30 15:50 To: user@spark.apache.org Subject: Hi: hadoop 2.5 for spark Hi , I am beginner with Apache spark. Can anyone let me know if it is mandatory to build spark with the Hadoop version I am using or can I use a pre built package and use it with my existing HDFS root folder? I am using Hadoop 2.5.0 and want to use Apache spark 1.2.0 with it. I could see a pre built version for 2.4 and above in the downbloads section of Spark homepage -> downloads. Siddharth Ubale, Synchronized Communications #43, Velankani Tech Park, Block No. II, 3rd Floor, Electronic City Phase I, Bangalore – 560 100 Tel : +91 80 3202 4060 Web: www.syncoms.com London|Bangalore|Orlando we innovate, plan, execute, and transform the business 邮件带有附件预览链接,若您转发或回复此邮件时不希望对方预览附件,建议您手动删除链接。 共有 1 个附件 image001.jpg(3K) 极速下载 在线预览
Sort Shuffle performance issues about using AppendOnlyMap for large data sets
Hi, all Recently we had caught performance issues when using spark 1.2.0 to read data from hbase and do some summary work. Our scenario means to : read large data sets from hbase (maybe 100G+ file) , form hbaseRDD, transform to schemardd, groupby and aggregate the data while got fewer new summary data sets, loading data into hbase (phoenix). Our major issue lead to : aggregate large datasets to get summary data sets would consume too long time (1 hour +) , while that should be supposed not so bad performance. We got the dump file attached and stacktrace from jstack like the following: From the stacktrace and dump file we can identify that processing large datasets would cause frequent AppendOnlyMap growing, and leading to huge map entrysize. We had referenced the source code of org.apache.spark.util.collection.AppendOnlyMap and found that the map had been initialized with capacity of 64. That would be too small for our use case. So the question is : Does anyone had encounted such issues before? How did that be resolved? I cannot find any jira issues for such problems and if someone had seen, please kindly let us know. More specified solution would goes to : Does any possibility exists for user defining the map capacity releatively in spark? If so, please tell how to achieve that. Best Thanks, Sun. Thread 22432: (state = IN_JAVA) - org.apache.spark.util.collection.AppendOnlyMap.growTable() @bci=87, line=224 (Compiled frame; information may be imprecise) - org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.growTable() @bci=1, line=38 (Interpreted frame) - org.apache.spark.util.collection.AppendOnlyMap.incrementSize() @bci=22, line=198 (Compiled frame) - org.apache.spark.util.collection.AppendOnlyMap.changeValue(java.lang.Object, scala.Function2) @bci=201, line=145 (Compiled frame) - org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(java.lang.Object, scala.Function2) @bci=3, line=32 (Compiled frame) - org.apache.spark.util.collection.ExternalSorter.insertAll(scala.collection.Iterator) @bci=141, line=205 (Compiled frame) - org.apache.spark.shuffle.sort.SortShuffleWriter.write(scala.collection.Iterator) @bci=74, line=58 (Interpreted frame) - org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext) @bci=169, line=68 (Interpreted frame) - org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext) @bci=2, line=41 (Interpreted frame) - org.apache.spark.scheduler.Task.run(long) @bci=77, line=56 (Interpreted frame) - org.apache.spark.executor.Executor$TaskRunner.run() @bci=310, line=196 (Interpreted frame) - java.util.concurrent.ThreadPoolExecutor.runWorker(java.util.concurrent.ThreadPoolExecutor$Worker) @bci=95, line=1145 (Interpreted frame) - java.util.concurrent.ThreadPoolExecutor$Worker.run() @bci=5, line=615 (Interpreted frame) - java.lang.Thread.run() @bci=11, line=744 (Interpreted frame) Thread 22431: (state = IN_JAVA) - org.apache.spark.util.collection.AppendOnlyMap.growTable() @bci=87, line=224 (Compiled frame; information may be imprecise) - org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.growTable() @bci=1, line=38 (Interpreted frame) - org.apache.spark.util.collection.AppendOnlyMap.incrementSize() @bci=22, line=198 (Compiled frame) - org.apache.spark.util.collection.AppendOnlyMap.changeValue(java.lang.Object, scala.Function2) @bci=201, line=145 (Compiled frame) - org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(java.lang.Object, scala.Function2) @bci=3, line=32 (Compiled frame) - org.apache.spark.util.collection.ExternalSorter.insertAll(scala.collection.Iterator) @bci=141, line=205 (Compiled frame) - org.apache.spark.shuffle.sort.SortShuffleWriter.write(scala.collection.Iterator) @bci=74, line=58 (Interpreted frame) - org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext) @bci=169, line=68 (Interpreted frame) - org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext) @bci=2, line=41 (Interpreted frame) - org.apache.spark.scheduler.Task.run(long) @bci=77, line=56 (Interpreted frame) - org.apache.spark.executor.Executor$TaskRunner.run() @bci=310, line=196 (Interpreted frame) - java.util.concurrent.ThreadPoolExecutor.runWorker(java.util.concurrent.ThreadPoolExecutor$Worker) @bci=95, line=1145 (Interpreted frame) - java.util.concurrent.ThreadPoolExecutor$Worker.run() @bci=5, line=615 (Interpreted frame) - java.lang.Thread.run() @bci=11, line=744 (Interpreted frame) fightf...@163.com dump.png Description: Binary data - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Sort Shuffle performance issues about using AppendOnlyMap for large data sets
Hi, Problem still exists. Any experts would take a look at this? Thanks, Sun. fightf...@163.com From: fightf...@163.com Date: 2015-02-06 17:54 To: user; dev Subject: Sort Shuffle performance issues about using AppendOnlyMap for large data sets Hi, all Recently we had caught performance issues when using spark 1.2.0 to read data from hbase and do some summary work. Our scenario means to : read large data sets from hbase (maybe 100G+ file) , form hbaseRDD, transform to schemardd, groupby and aggregate the data while got fewer new summary data sets, loading data into hbase (phoenix). Our major issue lead to : aggregate large datasets to get summary data sets would consume too long time (1 hour +) , while that should be supposed not so bad performance. We got the dump file attached and stacktrace from jstack like the following: From the stacktrace and dump file we can identify that processing large datasets would cause frequent AppendOnlyMap growing, and leading to huge map entrysize. We had referenced the source code of org.apache.spark.util.collection.AppendOnlyMap and found that the map had been initialized with capacity of 64. That would be too small for our use case. So the question is : Does anyone had encounted such issues before? How did that be resolved? I cannot find any jira issues for such problems and if someone had seen, please kindly let us know. More specified solution would goes to : Does any possibility exists for user defining the map capacity releatively in spark? If so, please tell how to achieve that. Best Thanks, Sun. Thread 22432: (state = IN_JAVA) - org.apache.spark.util.collection.AppendOnlyMap.growTable() @bci=87, line=224 (Compiled frame; information may be imprecise) - org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.growTable() @bci=1, line=38 (Interpreted frame) - org.apache.spark.util.collection.AppendOnlyMap.incrementSize() @bci=22, line=198 (Compiled frame) - org.apache.spark.util.collection.AppendOnlyMap.changeValue(java.lang.Object, scala.Function2) @bci=201, line=145 (Compiled frame) - org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(java.lang.Object, scala.Function2) @bci=3, line=32 (Compiled frame) - org.apache.spark.util.collection.ExternalSorter.insertAll(scala.collection.Iterator) @bci=141, line=205 (Compiled frame) - org.apache.spark.shuffle.sort.SortShuffleWriter.write(scala.collection.Iterator) @bci=74, line=58 (Interpreted frame) - org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext) @bci=169, line=68 (Interpreted frame) - org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext) @bci=2, line=41 (Interpreted frame) - org.apache.spark.scheduler.Task.run(long) @bci=77, line=56 (Interpreted frame) - org.apache.spark.executor.Executor$TaskRunner.run() @bci=310, line=196 (Interpreted frame) - java.util.concurrent.ThreadPoolExecutor.runWorker(java.util.concurrent.ThreadPoolExecutor$Worker) @bci=95, line=1145 (Interpreted frame) - java.util.concurrent.ThreadPoolExecutor$Worker.run() @bci=5, line=615 (Interpreted frame) - java.lang.Thread.run() @bci=11, line=744 (Interpreted frame) Thread 22431: (state = IN_JAVA) - org.apache.spark.util.collection.AppendOnlyMap.growTable() @bci=87, line=224 (Compiled frame; information may be imprecise) - org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.growTable() @bci=1, line=38 (Interpreted frame) - org.apache.spark.util.collection.AppendOnlyMap.incrementSize() @bci=22, line=198 (Compiled frame) - org.apache.spark.util.collection.AppendOnlyMap.changeValue(java.lang.Object, scala.Function2) @bci=201, line=145 (Compiled frame) - org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(java.lang.Object, scala.Function2) @bci=3, line=32 (Compiled frame) - org.apache.spark.util.collection.ExternalSorter.insertAll(scala.collection.Iterator) @bci=141, line=205 (Compiled frame) - org.apache.spark.shuffle.sort.SortShuffleWriter.write(scala.collection.Iterator) @bci=74, line=58 (Interpreted frame) - org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext) @bci=169, line=68 (Interpreted frame) - org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext) @bci=2, line=41 (Interpreted frame) - org.apache.spark.scheduler.Task.run(long) @bci=77, line=56 (Interpreted frame) - org.apache.spark.executor.Executor$TaskRunner.run() @bci=310, line=196 (Interpreted frame) - java.util.concurrent.ThreadPoolExecutor.runWorker(java.util.concurrent.ThreadPoolExecutor$Worker) @bci=95, line=1145 (Interpreted frame) - java.util.concurrent.ThreadPoolExecutor$Worker.run() @bci=5, line=615 (Interpreted frame) - java.lang.Thread.run() @bci=11, line=744 (Interpreted frame) fightf...@163.com 1 attachments dump.png(42K) download preview
pamameter passed for AppendOnlyMap initialCapacity
Hi, all Any experts can show me what can be done to change the initialCapacity of the following ? org.apache.spark.util.collection.AppendOnlyMap Cause we had caught problems in using spark to process large data sets during sort shuffle. Does spark offer a configurable parameter for supporting modifying this ? Very thanks, fightf...@163.com
Re: Re: Sort Shuffle performance issues about using AppendOnlyMap for large data sets
Hi, Really have no adequate solution got for this issue. Expecting any available analytical rules or hints. Thanks, Sun. fightf...@163.com From: fightf...@163.com Date: 2015-02-09 11:56 To: user; dev Subject: Re: Sort Shuffle performance issues about using AppendOnlyMap for large data sets Hi, Problem still exists. Any experts would take a look at this? Thanks, Sun. fightf...@163.com From: fightf...@163.com Date: 2015-02-06 17:54 To: user; dev Subject: Sort Shuffle performance issues about using AppendOnlyMap for large data sets Hi, all Recently we had caught performance issues when using spark 1.2.0 to read data from hbase and do some summary work. Our scenario means to : read large data sets from hbase (maybe 100G+ file) , form hbaseRDD, transform to schemardd, groupby and aggregate the data while got fewer new summary data sets, loading data into hbase (phoenix). Our major issue lead to : aggregate large datasets to get summary data sets would consume too long time (1 hour +) , while that should be supposed not so bad performance. We got the dump file attached and stacktrace from jstack like the following: From the stacktrace and dump file we can identify that processing large datasets would cause frequent AppendOnlyMap growing, and leading to huge map entrysize. We had referenced the source code of org.apache.spark.util.collection.AppendOnlyMap and found that the map had been initialized with capacity of 64. That would be too small for our use case. So the question is : Does anyone had encounted such issues before? How did that be resolved? I cannot find any jira issues for such problems and if someone had seen, please kindly let us know. More specified solution would goes to : Does any possibility exists for user defining the map capacity releatively in spark? If so, please tell how to achieve that. Best Thanks, Sun. Thread 22432: (state = IN_JAVA) - org.apache.spark.util.collection.AppendOnlyMap.growTable() @bci=87, line=224 (Compiled frame; information may be imprecise) - org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.growTable() @bci=1, line=38 (Interpreted frame) - org.apache.spark.util.collection.AppendOnlyMap.incrementSize() @bci=22, line=198 (Compiled frame) - org.apache.spark.util.collection.AppendOnlyMap.changeValue(java.lang.Object, scala.Function2) @bci=201, line=145 (Compiled frame) - org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(java.lang.Object, scala.Function2) @bci=3, line=32 (Compiled frame) - org.apache.spark.util.collection.ExternalSorter.insertAll(scala.collection.Iterator) @bci=141, line=205 (Compiled frame) - org.apache.spark.shuffle.sort.SortShuffleWriter.write(scala.collection.Iterator) @bci=74, line=58 (Interpreted frame) - org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext) @bci=169, line=68 (Interpreted frame) - org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext) @bci=2, line=41 (Interpreted frame) - org.apache.spark.scheduler.Task.run(long) @bci=77, line=56 (Interpreted frame) - org.apache.spark.executor.Executor$TaskRunner.run() @bci=310, line=196 (Interpreted frame) - java.util.concurrent.ThreadPoolExecutor.runWorker(java.util.concurrent.ThreadPoolExecutor$Worker) @bci=95, line=1145 (Interpreted frame) - java.util.concurrent.ThreadPoolExecutor$Worker.run() @bci=5, line=615 (Interpreted frame) - java.lang.Thread.run() @bci=11, line=744 (Interpreted frame) Thread 22431: (state = IN_JAVA) - org.apache.spark.util.collection.AppendOnlyMap.growTable() @bci=87, line=224 (Compiled frame; information may be imprecise) - org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.growTable() @bci=1, line=38 (Interpreted frame) - org.apache.spark.util.collection.AppendOnlyMap.incrementSize() @bci=22, line=198 (Compiled frame) - org.apache.spark.util.collection.AppendOnlyMap.changeValue(java.lang.Object, scala.Function2) @bci=201, line=145 (Compiled frame) - org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(java.lang.Object, scala.Function2) @bci=3, line=32 (Compiled frame) - org.apache.spark.util.collection.ExternalSorter.insertAll(scala.collection.Iterator) @bci=141, line=205 (Compiled frame) - org.apache.spark.shuffle.sort.SortShuffleWriter.write(scala.collection.Iterator) @bci=74, line=58 (Interpreted frame) - org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext) @bci=169, line=68 (Interpreted frame) - org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext) @bci=2, line=41 (Interpreted frame) - org.apache.spark.scheduler.Task.run(long) @bci=77, line=56 (Interpreted frame) - org.apache.spark.executor.Executor$TaskRunner.run() @bci=310, line=196 (Interpreted frame) - java.util.concurrent.ThreadPoolExecutor.runWorker(java.util.concurrent.ThreadPoolExecutor$Worker) @bci=95, line=1145 (Interpreted frame) - java.util.concurrent.ThreadPoolExecutor
Re: Re: Sort Shuffle performance issues about using AppendOnlyMap for large data sets
Hi, patrick Really glad to get your reply. Yes, we are doing group by operations for our work. We know that this is common for growTable when processing large data sets. The problem actually goes to : Do we have any possible chance to self-modify the initialCapacity using specifically for our application? Does spark provide such configs for achieving that goal? We know that this is trickle to get it working. Just want to know that how could this be resolved, or from other possible channel for we did not cover. Expecting for your kind advice. Thanks, Sun. fightf...@163.com From: Patrick Wendell Date: 2015-02-12 16:12 To: fightf...@163.com CC: user; dev Subject: Re: Re: Sort Shuffle performance issues about using AppendOnlyMap for large data sets The map will start with a capacity of 64, but will grow to accommodate new data. Are you using the groupBy operator in Spark or are you using Spark SQL's group by? This usually happens if you are grouping or aggregating in a way that doesn't sufficiently condense the data created from each input partition. - Patrick On Wed, Feb 11, 2015 at 9:37 PM, fightf...@163.com wrote: > Hi, > > Really have no adequate solution got for this issue. Expecting any available > analytical rules or hints. > > Thanks, > Sun. > > ________ > fightf...@163.com > > > From: fightf...@163.com > Date: 2015-02-09 11:56 > To: user; dev > Subject: Re: Sort Shuffle performance issues about using AppendOnlyMap for > large data sets > Hi, > Problem still exists. Any experts would take a look at this? > > Thanks, > Sun. > > > fightf...@163.com > > > From: fightf...@163.com > Date: 2015-02-06 17:54 > To: user; dev > Subject: Sort Shuffle performance issues about using AppendOnlyMap for large > data sets > Hi, all > Recently we had caught performance issues when using spark 1.2.0 to read > data from hbase and do some summary work. > Our scenario means to : read large data sets from hbase (maybe 100G+ file) , > form hbaseRDD, transform to schemardd, > groupby and aggregate the data while got fewer new summary data sets, > loading data into hbase (phoenix). > > Our major issue lead to : aggregate large datasets to get summary data sets > would consume too long time (1 hour +) , while that > should be supposed not so bad performance. We got the dump file attached and > stacktrace from jstack like the following: > > From the stacktrace and dump file we can identify that processing large > datasets would cause frequent AppendOnlyMap growing, and > leading to huge map entrysize. We had referenced the source code of > org.apache.spark.util.collection.AppendOnlyMap and found that > the map had been initialized with capacity of 64. That would be too small > for our use case. > > So the question is : Does anyone had encounted such issues before? How did > that be resolved? I cannot find any jira issues for such problems and > if someone had seen, please kindly let us know. > > More specified solution would goes to : Does any possibility exists for user > defining the map capacity releatively in spark? If so, please > tell how to achieve that. > > Best Thanks, > Sun. > >Thread 22432: (state = IN_JAVA) > - org.apache.spark.util.collection.AppendOnlyMap.growTable() @bci=87, > line=224 (Compiled frame; information may be imprecise) > - org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.growTable() > @bci=1, line=38 (Interpreted frame) > - org.apache.spark.util.collection.AppendOnlyMap.incrementSize() @bci=22, > line=198 (Compiled frame) > - > org.apache.spark.util.collection.AppendOnlyMap.changeValue(java.lang.Object, > scala.Function2) @bci=201, line=145 (Compiled frame) > - > org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(java.lang.Object, > scala.Function2) @bci=3, line=32 (Compiled frame) > - > org.apache.spark.util.collection.ExternalSorter.insertAll(scala.collection.Iterator) > @bci=141, line=205 (Compiled frame) > - > org.apache.spark.shuffle.sort.SortShuffleWriter.write(scala.collection.Iterator) > @bci=74, line=58 (Interpreted frame) > - > org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext) > @bci=169, line=68 (Interpreted frame) > - > org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext) > @bci=2, line=41 (Interpreted frame) > - org.apache.spark.scheduler.Task.run(long) @bci=77, line=56 (Interpreted > frame) > - org.apache.spark.executor.Executor$TaskRunner.run() @bci=310, line=196 > (Interpreted frame) > - > java.util.concurrent.ThreadPoolExecutor.runWorker(java.util.concurrent.ThreadPoolExecutor$Worker) > @bci=95, line=1145 (Interpreted frame) &g
Re: Spark code development practice
Hi, You can first establish a scala ide to develop and debug your spark program, lets say, intellij idea or eclipse. Thanks, Sun. fightf...@163.com From: Xi Shen Date: 2015-03-06 09:19 To: user@spark.apache.org Subject: Spark code development practice Hi, I am new to Spark. I see every spark program has a main() function. I wonder if I can run the spark program directly, without using spark-submit. I think it will be easier for early development and debug. Thanks, David
Re: Problem connecting to HBase
Hi, there You may want to check your hbase config. e.g. the following property can be changed to /hbase zookeeper.znode.parent /hbase-unsecure fightf...@163.com From: HARIPRIYA AYYALASOMAYAJULA Date: 2015-03-14 10:47 To: user Subject: Problem connecting to HBase Hello, I am running a HBase test case. I am using the example from the following: https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala I created a very small HBase table with 5 rows and 2 columns. I have attached a screenshot of the error log. I believe it is a problem where the driver program is unable to establish connection to the hbase. The following is my simple.sbt: name := "Simple Project" version := "1.0" scalaVersion := "2.10.4" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "1.2.0", "org.apache.hbase" % "hbase" % "0.98.9-hadoop2" % "provided", "org.apache.hbase" % "hbase-client" % "0.98.9-hadoop2" % "provided", "org.apache.hbase" % "hbase-server" % "0.98.9-hadoop2" % "provided", "org.apache.hbase" % "hbase-common" % "0.98.9-hadoop2" % "provided" ) I am using a 23 node cluster, did copy hbase-site.xml into /spark/conf folder and set spark.executor.extraClassPath pointing to the /hbase/ folder in the spark-defaults.conf Also, while submitting the spark job I am including the required jars : spark-submit --class "HBaseTest" --master yarn-cluster --driver-class-path /opt/hbase/0.98.9/lib/hbase-server-0.98.9-hadoop2.jar:/opt/hbase/0.98.9/lib/hbase-protocol-0.98.9-hadoop2.jar:/opt/hbase/0.98.9/lib/hbase-hadoop2-compat-0.98.9-hadoop2.jar:/opt/hbase/0.98.9/lib/hbase-client-0.98.9-hadoop2.jar:/opt/hbase/0.98.9/lib/hbase-common-0.98.9-hadoop2.jar:/opt/hbase/0.98.9/lib/htrace-core-2.04.jar /home/priya/usingHBase/Spark/target/scala-2.10/simple-project_2.10-1.0.jar /Priya/sparkhbase-test1 It would be great if you could point where I am going wrong, and what could be done to correct it. Thank you for your time. -- Regards, Haripriya Ayyalasomayajula Graduate Student Department of Computer Science University of Houston Contact : 650-796-7112 邮件带有附件预览链接,若您转发或回复此邮件时不希望对方预览附件,建议您手动删除链接。 共有 1 个附件 Screen Shot 2015-03-13 at 2.08.27 PM.png(131K) 极速下载 在线预览
Re: deploying Spark on standalone cluster
Hi, You may want to check your spark environment config in spark-env.sh, specifically for the SPARK_LOCAL_IP and check that whether you did modify that value, which may default be localhost. Thanks, Sun. fightf...@163.com From: sara mustafa Date: 2015-03-14 15:13 To: user Subject: deploying Spark on standalone cluster Hi, I am trying to deploy spark on standalone cluster of two machines on for master node and one for worker node. i have defined the two machines in conf/slaves file and also i /etc/hosts, when i tried to run the cluster the worker node is running but the master node failed to run and throw this error: 15/03/14 07:05:04 ERROR Remoting: Remoting error: [Startup failed] [ akka.remote.RemoteTransportException: Startup failed at akka.remote.Remoting.akka$remote$Remoting$$notifyError(Remoting.scala:136) at akka.remote.Remoting.start(Remoting.scala:201) at akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184) at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:618) at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:615) at akka.actor.ActorSystemImpl._start(ActorSystem.scala:615) at akka.actor.ActorSystemImpl.start(ActorSystem.scala:632) at akka.actor.ActorSystem$.apply(ActorSystem.scala:141) at akka.actor.ActorSystem$.apply(ActorSystem.scala:118) at org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121) at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54) at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53) at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1765) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1756) at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:56) at org.apache.spark.deploy.master.Master$.startSystemAndActor(Master.scala:849) at org.apache.spark.deploy.master.Master$.main(Master.scala:829) at org.apache.spark.deploy.master.Master.main(Master.scala) Caused by: org.jboss.netty.channel.ChannelException: Failed to bind to: srnode1/10.0.0.5:7077 at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272) at akka.remote.transport.netty.NettyTransport$$anonfun$listen$1.apply(NettyTransport.scala:393) at akka.remote.transport.netty.NettyTransport$$anonfun$listen$1.apply(NettyTransport.scala:389) at scala.util.Success$$anonfun$map$1.apply(Try.scala:206) at scala.util.Try$.apply(Try.scala:161) at scala.util.Success.map(Try.scala:206) at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235) at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) Can anyone help me? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/deploying-Spark-on-standalone-cluster-tp22049.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: deploying Spark on standalone cluster
Hi, You may want to check your spark environment config in spark-env.sh, specifically for the SPARK_LOCAL_IP and check that whether you did modify that value, which may default be localhost. Thanks, Sun. fightf...@163.com From: sara mustafa Date: 2015-03-14 15:13 To: user Subject: deploying Spark on standalone cluster Hi, I am trying to deploy spark on standalone cluster of two machines on for master node and one for worker node. i have defined the two machines in conf/slaves file and also i /etc/hosts, when i tried to run the cluster the worker node is running but the master node failed to run and throw this error: 15/03/14 07:05:04 ERROR Remoting: Remoting error: [Startup failed] [ akka.remote.RemoteTransportException: Startup failed at akka.remote.Remoting.akka$remote$Remoting$$notifyError(Remoting.scala:136) at akka.remote.Remoting.start(Remoting.scala:201) at akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184) at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:618) at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:615) at akka.actor.ActorSystemImpl._start(ActorSystem.scala:615) at akka.actor.ActorSystemImpl.start(ActorSystem.scala:632) at akka.actor.ActorSystem$.apply(ActorSystem.scala:141) at akka.actor.ActorSystem$.apply(ActorSystem.scala:118) at org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121) at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54) at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53) at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1765) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1756) at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:56) at org.apache.spark.deploy.master.Master$.startSystemAndActor(Master.scala:849) at org.apache.spark.deploy.master.Master$.main(Master.scala:829) at org.apache.spark.deploy.master.Master.main(Master.scala) Caused by: org.jboss.netty.channel.ChannelException: Failed to bind to: srnode1/10.0.0.5:7077 at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272) at akka.remote.transport.netty.NettyTransport$$anonfun$listen$1.apply(NettyTransport.scala:393) at akka.remote.transport.netty.NettyTransport$$anonfun$listen$1.apply(NettyTransport.scala:389) at scala.util.Success$$anonfun$map$1.apply(Try.scala:206) at scala.util.Try$.apply(Try.scala:161) at scala.util.Success.map(Try.scala:206) at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235) at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) Can anyone help me? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/deploying-Spark-on-standalone-cluster-tp22049.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Building spark over specified tachyon
Hi, all Noting that the current spark releases are built-in with tachyon 0.5.0 , if we want to recompile spark with maven and targeting on specific tachyon version (let's say the most recent 0.6.0 release), how should that be done? What maven compile command should be like ? Thanks, Sun. fightf...@163.com
Re: RE: Building spark over specified tachyon
Thanks, Jerry I got that way. Just to make sure whether there can be some option to directly specifying tachyon version. fightf...@163.com From: Shao, Saisai Date: 2015-03-16 11:10 To: fightf...@163.com CC: user Subject: RE: Building spark over specified tachyon I think you could change the pom file under Spark project to update the Tachyon related dependency version and rebuild it again (in case API is compatible, and behavior is the same). I’m not sure is there any command you can use to compile against Tachyon version. Thanks Jerry From: fightf...@163.com [mailto:fightf...@163.com] Sent: Monday, March 16, 2015 11:01 AM To: user Subject: Building spark over specified tachyon Hi, all Noting that the current spark releases are built-in with tachyon 0.5.0 , if we want to recompile spark with maven and targeting on specific tachyon version (let's say the most recent 0.6.0 release), how should that be done? What maven compile command should be like ? Thanks, Sun. fightf...@163.com
Re: Re: Building spark over specified tachyon
Thanks haoyuan. fightf...@163.com From: Haoyuan Li Date: 2015-03-16 12:59 To: fightf...@163.com CC: Shao, Saisai; user Subject: Re: RE: Building spark over specified tachyon Here is a patch: https://github.com/apache/spark/pull/4867 On Sun, Mar 15, 2015 at 8:46 PM, fightf...@163.com wrote: Thanks, Jerry I got that way. Just to make sure whether there can be some option to directly specifying tachyon version. fightf...@163.com From: Shao, Saisai Date: 2015-03-16 11:10 To: fightf...@163.com CC: user Subject: RE: Building spark over specified tachyon I think you could change the pom file under Spark project to update the Tachyon related dependency version and rebuild it again (in case API is compatible, and behavior is the same). I’m not sure is there any command you can use to compile against Tachyon version. Thanks Jerry From: fightf...@163.com [mailto:fightf...@163.com] Sent: Monday, March 16, 2015 11:01 AM To: user Subject: Building spark over specified tachyon Hi, all Noting that the current spark releases are built-in with tachyon 0.5.0 , if we want to recompile spark with maven and targeting on specific tachyon version (let's say the most recent 0.6.0 release), how should that be done? What maven compile command should be like ? Thanks, Sun. fightf...@163.com -- Haoyuan Li AMPLab, EECS, UC Berkeley http://www.cs.berkeley.edu/~haoyuan/
Re: Re: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient
Hi, Sandeep From your error log I can see that jdbc driver not found in your classpath. Did you had your mysql jdbc jar correctly configured in the specific classpath? Can you establish a hive jdbc connection using the url : jdbc:hive2://localhost:1 ? Thanks, Sun. fightf...@163.com From: sandeep vura Date: 2015-03-16 14:13 To: Ted Yu CC: user@spark.apache.org Subject: Re: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient Hi Ted, Did you find any solution. Thanks Sandeep On Mon, Mar 16, 2015 at 10:44 AM, sandeep vura wrote: Hi Ted, I am using Spark -1.2.1 and hive -0.13.1 you can check my configuration files attached below. ERROR IN SPARK n: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.jav a:346) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkS QLCLIDriver.scala:101) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQ LCLIDriver.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl. java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAcces sorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:622) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.h ive.metastore.HiveMetaStoreClient at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStore Utils.java:1412) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.(Retry ingMetaStoreClient.java:62) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(Ret ryingMetaStoreClient.java:72) at org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.ja va:2453) at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:2465) at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.jav a:340) ... 9 more Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstruct orAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingC onstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:534) at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStore Utils.java:1410) ... 14 more Caused by: javax.jdo.JDOFatalInternalException: Error creating transactional con nection factory NestedThrowables: java.lang.reflect.InvocationTargetException at org.datanucleus.api.jdo.NucleusJDOHelper.getJDOExceptionForNucleusExc eption(NucleusJDOHelper.java:587) at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.freezeConfigurat ion(JDOPersistenceManagerFactory.java:788) at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.createPersistenc
Re: Running Scala Word Count Using Maven
Hi, If you use maven, what is the actual compiling errors? fightf...@163.com From: Su She Date: 2015-03-16 13:20 To: user@spark.apache.org Subject: Running Scala Word Count Using Maven Hello Everyone, I am trying to run the Word Count from here: https://github.com/holdenk/learning-spark-examples/blob/master/mini-complete-example/src/main/scala/com/oreilly/learningsparkexamples/mini/scala/WordCount.scala I was able to successfully run the app using SBT, but not Maven. I don't see a difference between my pom file and the one used online. From what I understand from "Learning Spark", it should be possible to package scala code using Maven? My scala code is HelloWorld.scala under /src/main/scala/ and is exactly the same as the example online. Thank you for the help! My pom file: http://maven.apache.org/POM/4.0.0"; xmlns:xsi="http://w3.org/2001/XMLSchema-instance"; xsi:schemaLocation ="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd";> edu.berkely simple-project 4.0.0 Simple Project jar 1.0 cloudera http://repository.cloudera.com/artifactory/cloudera-repos/ org.apache.spark spark-core_2.10 1.2.0-cdh5.3.0 org.apache.hadoop hadoop-client 2.5.0-mr1-cdh5.3.0 org.apache.spark spark-streaming-kafka_2.10 1.2.0 org.apache.spark spark-streaming_2.10 1.2.0 org.scala-lang scala-library 2.10.4 org.scala-lang scala-compiler 2.10.4 org.apache.kafka kafka_2.10 0.8.2-beta com.101tec zkclient 0.3 com.yammer.metrics metrics-core 2.2.0 org.apache.kafka kafka-clients 0.8.2-beta org.apache.hadoop hadoop-yarn-server-web-proxy 2.5.0 com.datastax.spark spark-cassandra-connector_2.10 1.2.0-alpha1 com.datastax.spark spark-cassandra-connector-java_2.10 1.2.0-alpha1 com.datastax.cassandra cassandra-driver-core 2.1.3 org.apache.cassandra cassandra-thrift 2.1.3 org.apache.thrift libthrift 0.9.2 org.apache.cassandra cassandra-clientutil 2.1.3 com.google.guava guava 18.0 org.apache.maven.plugins maven-compiler-plugin 3.1 1.7 1.7 Thank you for the help! -Su
Re: StorageLevel: OFF_HEAP
Hi, Ranga That's true. Typically a version mis-match issue. Note that spark 1.2.1 has tachyon built in with version 0.5.0 , I think you may need to rebuild spark with your current tachyon release. We had used tachyon for several of our spark projects in a production environment. Thanks, Sun. fightf...@163.com From: Ranga Date: 2015-03-18 06:45 To: user@spark.apache.org Subject: StorageLevel: OFF_HEAP Hi I am trying to use the OFF_HEAP storage level in my Spark (1.2.1) cluster. The Tachyon (0.6.0-SNAPSHOT) nodes seem to be up and running. However, when I try to persist the RDD, I get the following error: ERROR [2015-03-16 22:22:54,017] ({Executor task launch worker-0} TachyonFS.java[connect]:364) - Invalid method name: 'getUserUnderfsTempFolder' ERROR [2015-03-16 22:22:54,050] ({Executor task launch worker-0} TachyonFS.java[getFileId]:1020) - Invalid method name: 'user_getFileId' Is this because of a version mis-match? On a different note, I was wondering if Tachyon has been used in a production environment by anybody in this group? Appreciate your help with this. - Ranga
Re: saveAsTable fails to save RDD in Spark SQL 1.3.0
Looks like some authentification issues. Can you check that your current user had authority to operate (maybe r/w/x) on /user/hive/warehouse? Thanks, Sun. fightf...@163.com From: smoradi Date: 2015-03-18 09:24 To: user Subject: saveAsTable fails to save RDD in Spark SQL 1.3.0 Hi, Basically my goal is to make the Spark SQL RDDs available to Tableau software through Simba ODBC driver. I’m running standalone Spark 1.3.0 on Ubuntu 14.04. Got the source code and complied it with maven. Hive is also setup and connected to mysql all on a the same machine. The hive-site.xml file has been copied to spark/conf. Here is the content of the hive-site.xml: javax.jdo.option.ConnectionURL jdbc:MySql://localhost:3306/metastore_db?createDatabaseIfNotExist=true metadata is stored in a MySQL server hive.metastore.schema.verification false javax.jdo.option.ConnectionDriverName com.mysql.jdbc.Driver MySQL JDBC driver class javax.jdo.option.ConnectionUserName hiveuser user name for connecting to mysql server javax.jdo.option.ConnectionPassword hivepassword password for connecting to mysql server Both hive and mysql work just fine. I can create a table with Hive and find it in mysql. The thriftserver is also configured and connected to the spark master. Everything works just fine and I can monitor all the workers and running applications through spark master UI. I have a very simple python script to convert a json file to an RDD like this: import json def transform(data): ts = data[:25].strip() jss = data[41:].strip() jsj = json.loads(jss) jsj['ts'] = ts return json.dumps(jsj) from pyspark.sql import HiveContext sqlContext = HiveContext(sc) rdd = sc.textFile("myfile") tbl = sqlContext.jsonRDD(rdd.map(transform)) tbl.saveAsTable("neworder") the saveAsTable fails with this: 15/03/17 17:22:17 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool Traceback (most recent call last): File "", line 1, in File "/opt/spark/python/pyspark/sql/dataframe.py", line 191, in saveAsTable self._jdf.saveAsTable(tableName, source, jmode, joptions) File "/opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__ File "/opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o31.saveAsTable. : java.io.IOException: Failed to rename DeprecatedRawLocalFileStatus{path=file:/user/hive/warehouse/neworder/_temporary/0/task_201503171618_0008_r_01/part-r-2.parquet; isDirectory=false; length=5591; replication=1; blocksize=33554432; modification_time=142663430; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false} to file:/user/hive/warehouse/neworder/part-r-2.parquet at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:346) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:362) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:310) at parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:43) at org.apache.spark.sql.parquet.ParquetRelation2.insert(newParquet.scala:649) at org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.scala:126) at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:308) at org.apache.spark.sql.hive.execution.CreateMetastoreDataSourceAsSelect.run(commands.scala:217) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:55) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:55) at org.apache.spark.sql.execution.ExecutedCommand.execute(commands.scala:65) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:1088) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:1088) at org.apache.spark.sql.DataFrame.saveAsTable(DataFrame.scala:1048) at org.apache.spark.sql.DataFrame.saveAsTable(DataFrame.scala:1018) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606)
rdd.cache() not working ?
Hi, all Running the following code snippet through spark-shell, however cannot see any cached storage partitions in web ui. Does this mean that cache now working ? Cause if we issue person.count again that we cannot say any time consuming performance upgrading. Hope anyone can explain this for a little. Best, Sun. case class Person(id: Int, col1: String) val person = sc.textFile("hdfs://namenode_host:8020/user/person.txt").map(_.split(",")).map(p => Person(p(0).trim.toInt, p(1))) person.cache person.count fightf...@163.com
Re: Re: rdd.cache() not working ?
Hi That is just the issue. After running person.cache we then run person.count however, there still not be any cache performance showed from web ui storage. Thanks, Sun. fightf...@163.com From: Taotao.Li Date: 2015-04-01 14:02 To: fightfate CC: user Subject: Re: rdd.cache() not working ? rerun person.count and you will see the performance of cache. person.cache would not cache it right now. It'll actually cache this RDD after one action[person.count here] 发件人: fightf...@163.com 收件人: "user" 发送时间: 星期三, 2015年 4 月 01日 下午 1:21:25 主题: rdd.cache() not working ? Hi, all Running the following code snippet through spark-shell, however cannot see any cached storage partitions in web ui. Does this mean that cache now working ? Cause if we issue person.count again that we cannot say any time consuming performance upgrading. Hope anyone can explain this for a little. Best, Sun. case class Person(id: Int, col1: String) val person = sc.textFile("hdfs://namenode_host:8020/user/person.txt").map(_.split(",")).map(p => Person(p(0).trim.toInt, p(1))) person.cache person.count fightf...@163.com -- --- Thanks & Best regards 李涛涛 Taotao · Li | Fixed Income@Datayes | Software Engineer 地址:上海市浦东新区陆家嘴西路99号万向大厦8楼, 200120 Address :Wanxiang Towen 8F, Lujiazui West Rd. No.99, Pudong New District, Shanghai, 200120 电话|Phone:021-60216502 手机|Mobile: +86-18202171279
Re: Re: rdd.cache() not working ?
Hi Still no good luck with your guide. Best. Sun. fightf...@163.com From: Yuri Makhno Date: 2015-04-01 15:26 To: fightf...@163.com CC: Taotao.Li; user Subject: Re: Re: rdd.cache() not working ? cache() method returns new RDD so you have to use something like this: val person = sc.textFile("hdfs://namenode_host:8020/user/person.txt").map(_.split(",")).map(p => Person(p(0).trim.toInt, p(1))) val cached = person.cache cached.count when you rerun count on cached you will see that cache works On Wed, Apr 1, 2015 at 9:35 AM, fightf...@163.com wrote: Hi That is just the issue. After running person.cache we then run person.count however, there still not be any cache performance showed from web ui storage. Thanks, Sun. fightf...@163.com From: Taotao.Li Date: 2015-04-01 14:02 To: fightfate CC: user Subject: Re: rdd.cache() not working ? rerun person.count and you will see the performance of cache. person.cache would not cache it right now. It'll actually cache this RDD after one action[person.count here] 发件人: fightf...@163.com 收件人: "user" 发送时间: 星期三, 2015年 4 月 01日 下午 1:21:25 主题: rdd.cache() not working ? Hi, all Running the following code snippet through spark-shell, however cannot see any cached storage partitions in web ui. Does this mean that cache now working ? Cause if we issue person.count again that we cannot say any time consuming performance upgrading. Hope anyone can explain this for a little. Best, Sun. case class Person(id: Int, col1: String) val person = sc.textFile("hdfs://namenode_host:8020/user/person.txt").map(_.split(",")).map(p => Person(p(0).trim.toInt, p(1))) person.cache person.count fightf...@163.com -- --- Thanks & Best regards 李涛涛 Taotao · Li | Fixed Income@Datayes | Software Engineer 地址:上海市浦东新区陆家嘴西路99号万向大厦8楼, 200120 Address :Wanxiang Towen 8F, Lujiazui West Rd. No.99, Pudong New District, Shanghai, 200120 电话|Phone:021-60216502 手机|Mobile: +86-18202171279
Re: Re: rdd.cache() not working ?
Hi all Thanks a lot for caspuring this. We are now using 1.3.0 release. We tested with both prebuilt version spark and source code compiling version targeting our CDH component, and the cache result did not show as expected. However, if we create dataframe with the person rdd and using sqlContext.cacheTable operation, we can see the cache results. Not sure what's happening here. If anyone can reproduce this issue, please let me know. Thanks, Sun fightf...@163.com From: Sean Owen Date: 2015-04-01 15:54 To: Yuri Makhno CC: fightf...@163.com; Taotao.Li; user Subject: Re: Re: rdd.cache() not working ? No, cache() changes the bookkeeping of the existing RDD. Although it returns a reference, it works to just call "person.cache". I can't reproduce this. When I try to cache an RDD and then count it, it is persisted in memory and I see it in the web UI. Something else must be different about what's being executed. On Wed, Apr 1, 2015 at 8:26 AM, Yuri Makhno wrote: > cache() method returns new RDD so you have to use something like this: > > val person = > sc.textFile("hdfs://namenode_host:8020/user/person.txt").map(_.split(",")).map(p > => Person(p(0).trim.toInt, p(1))) > > val cached = person.cache > >cached.count > > when you rerun count on cached you will see that cache works > > On Wed, Apr 1, 2015 at 9:35 AM, fightf...@163.com wrote: >> >> Hi >> That is just the issue. After running person.cache we then run >> person.count >> however, there still not be any cache performance showed from web ui >> storage. >> >> Thanks, >> Sun. >> >> >> fightf...@163.com >> >> >> From: Taotao.Li >> Date: 2015-04-01 14:02 >> To: fightfate >> CC: user >> Subject: Re: rdd.cache() not working ? >> rerun person.count and you will see the performance of cache. >> >> person.cache would not cache it right now. It'll actually cache this RDD >> after one action[person.count here] >> >> >> 发件人: fightf...@163.com >> 收件人: "user" >> 发送时间: 星期三, 2015年 4 月 01日 下午 1:21:25 >> 主题: rdd.cache() not working ? >> >> Hi, all >> >> Running the following code snippet through spark-shell, however cannot see >> any cached storage partitions in web ui. >> >> Does this mean that cache now working ? Cause if we issue person.count >> again that we cannot say any time consuming >> >> performance upgrading. Hope anyone can explain this for a little. >> >> Best, >> >> Sun. >> >>case class Person(id: Int, col1: String) >> >>val person = >> sc.textFile("hdfs://namenode_host:8020/user/person.txt").map(_.split(",")).map(p >> => Person(p(0).trim.toInt, p(1))) >> >>person.cache >> >>person.count >> >> >> fightf...@163.com >> >> >> >> -- >> >> >> --- >> >> Thanks & Best regards >> >> 李涛涛 Taotao · Li | Fixed Income@Datayes | Software Engineer >> >> 地址:上海市浦东新区陆家嘴西路99号万向大厦8楼, 200120 >> Address :Wanxiang Towen 8F, Lujiazui West Rd. No.99, Pudong New District, >> Shanghai, 200120 >> >> 电话|Phone:021-60216502 手机|Mobile: +86-18202171279 >> >> >
Re: Cannot run the example in the Spark 1.3.0 following the document
Hi, there you may need to add : import sqlContext.implicits._ Best, Sun fightf...@163.com From: java8964 Date: 2015-04-03 10:15 To: user@spark.apache.org Subject: Cannot run the example in the Spark 1.3.0 following the document I tried to check out what Spark SQL 1.3.0. I installed it and following the online document here: http://spark.apache.org/docs/latest/sql-programming-guide.html In the example, it shows something like this: // Select everybody, but increment the age by 1 df.select("name", df("age") + 1).show() // name(age + 1) // Michael null // Andy31 // Justin 20 But what I got on my Spark 1.3.0 is the following error: Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.3.0 /_/ Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.6.0_43)scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc) sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@1c845f64 scala> val df = sqlContext.jsonFile("/user/yzhang/people.json")df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]scala> df.printSchema root |-- age: long (nullable = true) |-- name: string (nullable = true)scala> df.select("name", df("age") + 1).show() :30: error: overloaded method value select with alternatives: (col: String,cols: String*)org.apache.spark.sql.DataFrame (cols: org.apache.spark.sql.Column*)org.apache.spark.sql.DataFrame cannot be applied to (String, org.apache.spark.sql.Column) df.select("name", df("age") + 1).show() ^ Is this a bug in Spark 1.3.0, or my build having some problem? Thanks
Re: Re: Sort Shuffle performance issues about using AppendOnlyMap for large data sets
Hi, there Which version are you using ? Actually the problem seems gone after we change our spark version from 1.2.0 to 1.3.0 Not sure what the internal changes did. Best, Sun. fightf...@163.com From: Night Wolf Date: 2015-05-12 22:05 To: fightf...@163.com CC: Patrick Wendell; user; dev Subject: Re: Re: Sort Shuffle performance issues about using AppendOnlyMap for large data sets Seeing similar issues, did you find a solution? One would be to increase the number of partitions if you're doing lots of object creation. On Thu, Feb 12, 2015 at 7:26 PM, fightf...@163.com wrote: Hi, patrick Really glad to get your reply. Yes, we are doing group by operations for our work. We know that this is common for growTable when processing large data sets. The problem actually goes to : Do we have any possible chance to self-modify the initialCapacity using specifically for our application? Does spark provide such configs for achieving that goal? We know that this is trickle to get it working. Just want to know that how could this be resolved, or from other possible channel for we did not cover. Expecting for your kind advice. Thanks, Sun. fightf...@163.com From: Patrick Wendell Date: 2015-02-12 16:12 To: fightf...@163.com CC: user; dev Subject: Re: Re: Sort Shuffle performance issues about using AppendOnlyMap for large data sets The map will start with a capacity of 64, but will grow to accommodate new data. Are you using the groupBy operator in Spark or are you using Spark SQL's group by? This usually happens if you are grouping or aggregating in a way that doesn't sufficiently condense the data created from each input partition. - Patrick On Wed, Feb 11, 2015 at 9:37 PM, fightf...@163.com wrote: > Hi, > > Really have no adequate solution got for this issue. Expecting any available > analytical rules or hints. > > Thanks, > Sun. > > ________________ > fightf...@163.com > > > From: fightf...@163.com > Date: 2015-02-09 11:56 > To: user; dev > Subject: Re: Sort Shuffle performance issues about using AppendOnlyMap for > large data sets > Hi, > Problem still exists. Any experts would take a look at this? > > Thanks, > Sun. > > > fightf...@163.com > > > From: fightf...@163.com > Date: 2015-02-06 17:54 > To: user; dev > Subject: Sort Shuffle performance issues about using AppendOnlyMap for large > data sets > Hi, all > Recently we had caught performance issues when using spark 1.2.0 to read > data from hbase and do some summary work. > Our scenario means to : read large data sets from hbase (maybe 100G+ file) , > form hbaseRDD, transform to schemardd, > groupby and aggregate the data while got fewer new summary data sets, > loading data into hbase (phoenix). > > Our major issue lead to : aggregate large datasets to get summary data sets > would consume too long time (1 hour +) , while that > should be supposed not so bad performance. We got the dump file attached and > stacktrace from jstack like the following: > > From the stacktrace and dump file we can identify that processing large > datasets would cause frequent AppendOnlyMap growing, and > leading to huge map entrysize. We had referenced the source code of > org.apache.spark.util.collection.AppendOnlyMap and found that > the map had been initialized with capacity of 64. That would be too small > for our use case. > > So the question is : Does anyone had encounted such issues before? How did > that be resolved? I cannot find any jira issues for such problems and > if someone had seen, please kindly let us know. > > More specified solution would goes to : Does any possibility exists for user > defining the map capacity releatively in spark? If so, please > tell how to achieve that. > > Best Thanks, > Sun. > >Thread 22432: (state = IN_JAVA) > - org.apache.spark.util.collection.AppendOnlyMap.growTable() @bci=87, > line=224 (Compiled frame; information may be imprecise) > - org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.growTable() > @bci=1, line=38 (Interpreted frame) > - org.apache.spark.util.collection.AppendOnlyMap.incrementSize() @bci=22, > line=198 (Compiled frame) > - > org.apache.spark.util.collection.AppendOnlyMap.changeValue(java.lang.Object, > scala.Function2) @bci=201, line=145 (Compiled frame) > - > org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(java.lang.Object, > scala.Function2) @bci=3, line=32 (Compiled frame) > - > org.apache.spark.util.collection.ExternalSorter.insertAll(scala.collection.Iterator) > @bci=141, line=205 (Compiled frame) > - > org.apache.spark.shuffle.sort.SortShuffleWriter.write(scala.collection.Iterator) > @bci=74, line=58 (Interp