Re: Spark Function setup and cleanup
Thank you, but that doesn't answer my general question. I might need to enrich my records using different datasources (or DB's) So the general use case I need to support is to have some kind of Function that has init() logic for creating connection to DB, query the DB for each records and enrich my input record with stuff from the DB, and use some kind of close() logic to close the connection. I have implemented this kind of use case using Map/Reduce and I want to know how can I do it with spark Thanks On Fri, Jul 25, 2014 at 6:24 AM, Yanbo Liang wrote: > You can refer this topic > http://www.mapr.com/developercentral/code/loading-hbase-tables-spark > > > 2014-07-24 22:32 GMT+08:00 Yosi Botzer : > > In my case I want to reach HBase. For every record with userId I want to >> get some extra information about the user and add it to result record for >> further prcessing >> >> >> On Thu, Jul 24, 2014 at 9:11 AM, Yanbo Liang >> wrote: >> >>> If you want to connect to DB in program, you can use JdbcRDD ( >>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala >>> ) >>> >>> >>> 2014-07-24 18:32 GMT+08:00 Yosi Botzer : >>> >>> Hi, I am using the Java api of Spark. I wanted to know if there is a way to run some code in a manner that is like the setup() and cleanup() methods of Hadoop Map/Reduce The reason I need it is because I want to read something from the DB according to each record I scan in my Function, and I would like to open the DB connection only once (and close it only once). Thanks >>> >>> >> >
Re: Spark Function setup and cleanup
Look at mapPartitions. Where as map turns one value V1 into one value V2, mapPartitions lets you turn one entire Iterator[V1] to one whole Iterator [V2]. The function that does so can perform some initialization at its start, and then process all of the values, and clean up at its end. This is how you mimic a Mapper, really. The most literal translation of Hadoop MapReduce I can think of is: Mapper: mapPartitions to turn many (K1,V1) into many (K2,V2) (shuffle) groupByKey to turn that into (K2,Iterator[V2]) Reducer mapPartitions to turn many (K2,Iterator[V2]) into many (K3,V3) It's not necessarily optimal to do it this way -- especially the groupByKey bit. You have more expressive power here and need not fit it into this paradigm. But yes you can get the same effect as in MapReduce, mostly from mapPartitions. On Sat, Jul 26, 2014 at 8:52 AM, Yosi Botzer wrote: > Thank you, but that doesn't answer my general question. > > I might need to enrich my records using different datasources (or DB's) > > So the general use case I need to support is to have some kind of Function > that has init() logic for creating connection to DB, query the DB for each > records and enrich my input record with stuff from the DB, and use some kind > of close() logic to close the connection. > > I have implemented this kind of use case using Map/Reduce and I want to know > how can I do it with spark
SparkSQL extensions
Hello I was wondering is it easy for you guys to point me to what modules I need to update if I had to add extra functionality to sparkSQL? I was thinking to implement a region-join operator and I guess I should add the implementation details under joins.scala but what else do I need to modify? thanks Christos
Re: Hadoop client protocol mismatch with spark 1.0.1, cdh3u5
(For the benefit of other users) The workaround appears to be building spark for the exact Hadoop version and building the app with spark as a provided dependency + without the hadoop-client as a direct dependency of the app. With that, hdfs access works just fine. On Fri, Jul 25, 2014 at 11:50 PM, Bharath Ravi Kumar wrote: > That's right, I'm looking to depend on spark in general and change only > the hadoop client deps. The spark master and slaves use the > spark-1.0.1-bin-hadoop1 binaries from the downloads page. The relevant > snippet from the app's maven pom is as follows: > > > org.apache.spark > spark-core_2.10 > 1.0.1 > provided > > > org.apache.hadoop > hadoop-client > 0.20.2-cdh3u5 > jar > > > > > > Cloudera repository > > https://repository.cloudera.com/artifactory/cloudera-repos/ > > > Akka repository > http://repo.akka.io/releases > > > > > Thanks, > Bharath > > > On Fri, Jul 25, 2014 at 10:29 PM, Sean Owen wrote: > >> If you link against the pre-built binary, that's for Hadoop 1.0.4. Can >> you show your deps to clarify what you are depending on? Building >> custom Spark and depending on it is a different thing from depending >> on plain Spark and changing its deps. I think you want the latter. >> >> On Fri, Jul 25, 2014 at 5:46 PM, Bharath Ravi Kumar >> wrote: >> > Thanks for responding. I used the pre built spark binaries meant for >> > hadoop1,cdh3u5. I do not intend to build spark against a specific >> > distribution. Irrespective of whether I build my app with the explicit >> cdh >> > hadoop client dependency, I get the same error message. I also verified >> > that my app's uber jar had pulled in the cdh hadoop client >> dependencies. >> > >> > On 25-Jul-2014 9:26 pm, "Sean Owen" wrote: >> >> >> >> This indicates your app is not actually using the version of the HDFS >> >> client you think. You built Spark from source with the right deps it >> >> seems, but are you sure you linked to your build in your app? >> >> >> >> On Fri, Jul 25, 2014 at 4:32 PM, Bharath Ravi Kumar < >> reachb...@gmail.com> >> >> wrote: >> >> > Any suggestions to work around this issue ? The pre built spark >> >> > binaries >> >> > don't appear to work against cdh as documented, unless there's a >> build >> >> > issue, which seems unlikely. >> >> > >> >> > On 25-Jul-2014 3:42 pm, "Bharath Ravi Kumar" >> >> > wrote: >> >> >> >> >> >> >> >> >> I'm encountering a hadoop client protocol mismatch trying to read >> from >> >> >> HDFS (cdh3u5) using the pre-build spark from the downloads page >> (linked >> >> >> under "For Hadoop 1 (HDP1, CDH3)"). I've also followed the >> >> >> instructions at >> >> >> >> >> >> >> http://spark.apache.org/docs/latest/hadoop-third-party-distributions.html >> >> >> (i.e. building the app against hadoop-client 0.20.2-cdh3u5), but >> >> >> continue to >> >> >> see the following error regardless of whether I link the app with >> the >> >> >> cdh >> >> >> client: >> >> >> >> >> >> 14/07/25 09:53:43 INFO client.AppClient$ClientActor: Executor >> updated: >> >> >> app-20140725095343-0016/1 is now RUNNING >> >> >> 14/07/25 09:53:43 WARN util.NativeCodeLoader: Unable to load >> >> >> native-hadoop >> >> >> library for your platform... using builtin-java classes where >> >> >> applicable >> >> >> 14/07/25 09:53:43 WARN snappy.LoadSnappy: Snappy native library not >> >> >> loaded >> >> >> Exception in thread "main" >> org.apache.hadoop.ipc.RPC$VersionMismatch: >> >> >> Protocol org.apache.hadoop.hdfs.protocol.ClientProtocol version >> >> >> mismatch. >> >> >> (client = 61, server = 63) >> >> >> at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:401) >> >> >> at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:379) >> >> >> >> >> >> >> >> >> While I can build spark against the exact hadoop distro version, I'd >> >> >> rather work with the standard prebuilt binaries, making additional >> >> >> changes >> >> >> while building the app if necessary. Any >> workarounds/recommendations? >> >> >> >> >> >> Thanks, >> >> >> Bharath >> > >
Re: Emacs Setup Anyone?
Normally any setup that has inferior mode for scala repl will also support spark repl (with little or no modifications). Apart from that I personally use spark repl "normally" by invoking spark-shell in a shell in emacs, and I keep the scala tags(etags) for the spark loaded. With this setup it is kinda fast to do either tag prediction at point which is not accurate etc.. but its useful. Incase you are working on building this(inferior mode for spark repl) for us, I can come up with a wishlist. Prashant Sharma On Sat, Jul 26, 2014 at 3:07 AM, Andrei wrote: > I have never tried Spark REPL from within Emacs, but I remember that > switching from normal Python to Pyspark was as simple as changing > interpreter name at the beginning of session. Seems like ensime [1] > (together with ensime-emacs [2]) should be a good point to start. For > example, take a look at ensime-sbt.el [3] that defines a number of > Scala/SBT commands. > > [1]: https://github.com/ensime/ensime-server > [2]: https://github.com/ensime/ensime-emacs > [3]: https://github.com/ensime/ensime-emacs/blob/master/ensime-sbt.el > > > > > On Thu, Jul 24, 2014 at 10:14 PM, Steve Nunez > wrote: > >> Anyone out there have a good configuration for emacs? Scala-mode sort of >> works, but I’d love to see a fully-supported spark-mode with an inferior >> shell. Searching didn’t turn up much of anything. >> >> Any emacs users out there? What setup are you using? >> >> Cheers, >> - SteveN >> >> >> >> CONFIDENTIALITY NOTICE >> NOTICE: This message is intended for the use of the individual or entity >> to which it is addressed and may contain information that is confidential, >> privileged and exempt from disclosure under applicable law. If the reader >> of this message is not the intended recipient, you are hereby notified that >> any printing, copying, dissemination, distribution, disclosure or >> forwarding of this communication is strictly prohibited. If you have >> received this communication in error, please contact the sender immediately >> and delete it from your system. Thank You. > > >
How can I integrate spark cluster into my own program without using spark-submit?
I want to use spark cluster through a scala function. So I can integrate spark into my program directly. For example: When I call count function in my own program, my program will deploy the function to the cluster , so I can get the result directly def count()= { val master = "spark://mache123:7077" val appName = "control_test" val sc = new SparkContext(master, appName) val rdd = sc.textFile("hdfs://123d101suse11sp3:9000/netflix/netflix.test") val count = rdd.count System.out.println("rdd.count = " + count) count }
Fwd: Exception : org.apache.hadoop.hive.ql.metadata.HiveException: Unable to fetch table
> > Hi, > > This is my first code in shark 0.9.1. I am new to spark and shark. So I > don't know where I went wrong. It will be really helpful, If some one out > there can troubleshoot the problem. > First of all I will give a glimpse on my code which is developed in > IntellijIdea. This code is running perfectly in the editor > > *Code:* > > def main(args: Array[String]){ > val sparkConf = new > SparkConf().setAppName("SharkTest").setMaster("local") > .set("spark.executor.memory", "8g") > .set("spark.worker.memory", "8g") > .set("spark.executor.uri", "http://IP/spark/spark-0.9.1.tar.gz";) > .set("spark.mesos.coarse", "true") > > .setJars(List(args(1)+"/shark-assembly-0.9.1-hadoop2.0.0-cdh4.5.0.jar")) > val shc = SharkEnv.initWithSharkContext(sparkConf) > val q1="CREATE EXTERNAL TABLE IF NOT EXISTS table1(startIpNum > string,endIpNum string,locId string) ROW FORMAT DELIMITED FIELDS TERMINATED > BY '"+args(3)+"' LOCATION '"+args(2)+"' " > val q3="SELECT * FROM table1" > shc.runSql(q1) > shc.runSql(q3) > shc.sql2rdd(q3).map{resultSet=> > val > y=resultSet.colname2indexMap.values.map(index=>resultSet(index)).reduce((a,b)=>a+","+b) > y > }.saveAsTextFile(args(4)) > shc.sql("DROP TABLE IF EXISTS table1") > } > > *build.sbt:* > > > import AssemblyKeys._ > > assemblySettings > > name := "appname" > > version := "1.0" > > scalaVersion := "2.10.3" > > mainClass := Some("classname") > > libraryDependencies ++= Seq("org.apache.spark" %% "spark-core" % "0.9.1", > "edu.berkeley.cs.shark" %% "shark" % "0.9.1", > "org.apache.hive" % "hive-anttasks" % "0.11.0", > "org.apache.hive" % "hive-beeline" % "0.11.0", > "org.apache.hive" % "hive-cli" % "0.11.0", > "org.apache.hive" % "hive-common" % "0.11.0", > "org.apache.hive" % "hive-exec" % "0.11.0", > "org.apache.hive" % "hive-hbase-handler" % "0.11.0", > "org.apache.hive" % "hive-hwi" % "0.11.0", > "org.apache.hive" % "hive-jdbc" % "0.11.0", > "org.apache.hive" % "hive-metastore" % "0.11.0", > "org.apache.hive" % "hive-serde" % "0.11.0", > "org.apache.hive" % "hive-service" % "0.11.0", > "org.apache.hive" % "hive-shims" % "0.11.0", > "org.datanucleus" % "datanucleus-core" % "3.2.2", > "org.datanucleus" % "datanucleus-rdbms" % "3.2.1", > "org.datanucleus" % "datanucleus-api-jdo" % "3.2.1", > "org.datanucleus" % "datanucleus-enhancer" % "3.1.1", > "org.apache.derby" % "derby" % "10.10.1.1", > "org.apache.hadoop" % "hadoop-client" % "2.0.0-cdh4.5.0") > > resolvers ++= Seq("Akka Repository" at "http://repo.akka.io/releases/";, > "Cloudera Repository" at " > https://repository.cloudera.com/artifactory/cloudera-repos/";) > > mergeStrategy in assembly := { > case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard > case m if m.toLowerCase.matches("meta-inf.*\\.sf$") => > MergeStrategy.discard > case "log4j.properties" => MergeStrategy.discard > case m if m.toLowerCase.startsWith("meta-inf/services/") => > MergeStrategy.filterDistinctLines > case "reference.conf" => MergeStrategy.concat > case _ => MergeStrategy.first > } > > sbt assembly plugin version : 0.10.2 > > The problem is only when I am trying create the jar of the code. > > Steps followed to create the jar: > 1. Sbt clean > 2. Sbt assembly > > When I try to run the jar using the command "java -jar > " , an error comes as "invalid or corrupt jar" > The same jar is accepted when executed as "java -cp > . But in this case a hive exception > occurs as "unable to fetch the table tablename" > > > 14/07/26 12:21:39 INFO Driver: > 14/07/26 12:21:39 INFO Driver: > 14/07/26 12:21:39 INFO ParseDriver: Parsing command: CREATE EXTERNAL TABLE > IF NOT EXISTS table1(startIpNum string,endIpNum string,locId string) ROW > FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION > '/home/user/foldername/Input/SharkTest' > 14/07/26 12:21:39 INFO ParseDriver: Parse Completed > 14/07/26 12:21:40 INFO SharkSemanticAnalyzer: Starting Semantic Analysis > 14/07/26 12:21:40 INFO SharkSemanticAnalyzer: Creating table table1 > position=36 > 14/07/26 12:21:40 INFO HiveMetaStore: 0: Opening raw store with > implemenation class:org.apache.hadoop.hive.metastore.ObjectStore > 14/07/26 12:21:40 INFO ObjectStore: ObjectStore, initialize called > org.apache.hadoop.hive.ql.metadata.HiveException: Unable to fetch table > table1 > at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:957) > at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:904) > at > org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeCreateTable(SemanticAnalyzer.java:9328) > at > org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeInternal(SemanticAnalyzer.java:8647) > at > shark.parse.SharkSemanticAnalyzer.analyzeInternal(SharkSemanticAnalyzer.scala:105) > at > org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.analyze(BaseSemanticAnalyzer.java:27
Exception : org.apache.hadoop.hive.ql.metadata.HiveException: Unable to fetch table
> > Hi, > > This is my first code in shark 0.9.1. I am new to spark and shark. So I > don't know where I went wrong. It will be really helpful, If some one out > there can troubleshoot the problem. > First of all I will give a glimpse on my code which is developed in > IntellijIdea. This code is running perfectly in the editor > > *Code:* > > def main(args: Array[String]){ > val sparkConf = new > SparkConf().setAppName("SharkTest").setMaster("local") > .set("spark.executor.memory", "8g") > .set("spark.worker.memory", "8g") > .set("spark.executor.uri", "http://IP/spark/spark-0.9.1.tar.gz";) > .set("spark.mesos.coarse", "true") > > .setJars(List(args(1)+"/shark-assembly-0.9.1-hadoop2.0.0-cdh4.5.0.jar")) > val shc = SharkEnv.initWithSharkContext(sparkConf) > val q1="CREATE EXTERNAL TABLE IF NOT EXISTS table1(startIpNum > string,endIpNum string,locId string) ROW FORMAT DELIMITED FIELDS TERMINATED > BY '"+args(3)+"' LOCATION '"+args(2)+"' " > val q3="SELECT * FROM table1" > shc.runSql(q1) > shc.runSql(q3) > shc.sql2rdd(q3).map{resultSet=> > val > y=resultSet.colname2indexMap.values.map(index=>resultSet(index)).reduce((a,b)=>a+","+b) > y > }.saveAsTextFile(args(4)) > shc.sql("DROP TABLE IF EXISTS table1") > } > > *build.sbt:* > > > import AssemblyKeys._ > > assemblySettings > > name := "appname" > > version := "1.0" > > scalaVersion := "2.10.3" > > mainClass := Some("classname") > > libraryDependencies ++= Seq("org.apache.spark" %% "spark-core" % "0.9.1", > "edu.berkeley.cs.shark" %% "shark" % "0.9.1", > "org.apache.hive" % "hive-anttasks" % "0.11.0", > "org.apache.hive" % "hive-beeline" % "0.11.0", > "org.apache.hive" % "hive-cli" % "0.11.0", > "org.apache.hive" % "hive-common" % "0.11.0", > "org.apache.hive" % "hive-exec" % "0.11.0", > "org.apache.hive" % "hive-hbase-handler" % "0.11.0", > "org.apache.hive" % "hive-hwi" % "0.11.0", > "org.apache.hive" % "hive-jdbc" % "0.11.0", > "org.apache.hive" % "hive-metastore" % "0.11.0", > "org.apache.hive" % "hive-serde" % "0.11.0", > "org.apache.hive" % "hive-service" % "0.11.0", > "org.apache.hive" % "hive-shims" % "0.11.0", > "org.datanucleus" % "datanucleus-core" % "3.2.2", > "org.datanucleus" % "datanucleus-rdbms" % "3.2.1", > "org.datanucleus" % "datanucleus-api-jdo" % "3.2.1", > "org.datanucleus" % "datanucleus-enhancer" % "3.1.1", > "org.apache.derby" % "derby" % "10.10.1.1", > "org.apache.hadoop" % "hadoop-client" % "2.0.0-cdh4.5.0") > > resolvers ++= Seq("Akka Repository" at "http://repo.akka.io/releases/";, > "Cloudera Repository" at " > https://repository.cloudera.com/artifactory/cloudera-repos/";) > > mergeStrategy in assembly := { > case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard > case m if m.toLowerCase.matches("meta-inf.*\\.sf$") => > MergeStrategy.discard > case "log4j.properties" => MergeStrategy.discard > case m if m.toLowerCase.startsWith("meta-inf/services/") => > MergeStrategy.filterDistinctLines > case "reference.conf" => MergeStrategy.concat > case _ => MergeStrategy.first > } > > sbt assembly plugin version : 0.10.2 > > The problem is only when I am trying create the jar of the code. > > Steps followed to create the jar: > 1. Sbt clean > 2. Sbt assembly > > When I try to run the jar using the command "java -jar > " , an error comes as "invalid or corrupt jar" > The same jar is accepted when executed as "java -cp > . But in this case a hive exception > occurs as "unable to fetch the table tablename" > > > 14/07/26 12:21:39 INFO Driver: > 14/07/26 12:21:39 INFO Driver: > 14/07/26 12:21:39 INFO ParseDriver: Parsing command: CREATE EXTERNAL TABLE > IF NOT EXISTS table1(startIpNum string,endIpNum string,locId string) ROW > FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION > '/home/user/foldername/Input/SharkTest' > 14/07/26 12:21:39 INFO ParseDriver: Parse Completed > 14/07/26 12:21:40 INFO SharkSemanticAnalyzer: Starting Semantic Analysis > 14/07/26 12:21:40 INFO SharkSemanticAnalyzer: Creating table table1 > position=36 > 14/07/26 12:21:40 INFO HiveMetaStore: 0: Opening raw store with > implemenation class:org.apache.hadoop.hive.metastore.ObjectStore > 14/07/26 12:21:40 INFO ObjectStore: ObjectStore, initialize called > org.apache.hadoop.hive.ql.metadata.HiveException: Unable to fetch table > table1 > at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:957) > at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:904) > at > org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeCreateTable(SemanticAnalyzer.java:9328) > at > org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeInternal(SemanticAnalyzer.java:8647) > at > shark.parse.SharkSemanticAnalyzer.analyzeInternal(SharkSemanticAnalyzer.scala:105) > at > org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.analyze(BaseSemanticAnalyzer.java:27
Help using streaming from Spark Shell
Hi, I'm starting spark-shell like this: SPARK_MEM=1g SPARK_JAVA_OPTS="-Dspark.cleaner.ttl=3600" /spark/bin/spark-shell -c 3 but when I try to create a streaming context val scc = new StreamingContext(sc, Seconds(10)) I get: org.apache.spark.SparkException: Spark Streaming cannot be used without setting spark.cleaner.ttl; set this property before creating a SparkContext creating a SparkContext (use SPARK_JAVA_OPTS for the shell) at org.apache.spark.streaming.StreamingContext.(StreamingContext.scala:121) I also tried export SPARK_JAVA_OPTS="-Dspark.cleaner.ttl=3600" before calling spark-shell but with no luck... What am I doing wrong? This is spark 0.9.1 -- I cannot upgrade
Re: SparkSQL extensions
A very simple example of adding a new operator to Spark SQL: https://github.com/apache/spark/pull/1366 An example of adding a new type of join to Spark SQL: https://github.com/apache/spark/pull/837 Basically, you will need to add a new physical operator that inherits from SparkPlan and a Strategy that causes the query planner to select it. Maybe you can explain a little more what you mean by region-join? If its only a different algorithm, and not a logically different type of join, then you will not need to make some of he logical modifications that the second PR did. Often the hardest part here is going to be figuring out when to use one join over another. Right now the rules are pretty straightforward: The joins that are picked first are the most efficient but only handle certain cases (inner joins with equality predicates). When that is not the case it falls back on slower, but more general operators. If there are more subtle trade offs involved then we may need to wait until we have more statistics to help us make the choice. I'd suggest opening a JIRA and proposing a design before going too far. Michael On Sat, Jul 26, 2014 at 3:32 AM, Christos Kozanitis wrote: > Hello > > I was wondering is it easy for you guys to point me to what modules I need > to update if I had to add extra functionality to sparkSQL? > > I was thinking to implement a region-join operator and I guess I should > add the implementation details under joins.scala but what else do I need to > modify? > > thanks > Christos >
Lot of object serialization even with MEMORY_ONLY
Hello, I am executing the SparkPageRank example. It uses the "cache()" API for persistence of RDDs. And if I am not wrong, it in turn uses MEMORY_ONLY storage level. However, in oprofile report it shows a lot of activity in writeObject0 function. There is not even a single "Spilling in-memory..." message in the output/log. This is because I am using a huge heap size of 120GB. Can someone please tell me why do I see so much serialization happening, even though MEMORY_ONLY storage level is used? The spark version that I am using is 1.0.1 Thanks, Lokesh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Lot-of-object-serialization-even-with-MEMORY-ONLY-tp10722.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
"Spilling in-memory..." messages in log even with MEMORY_ONLY
Hello, I am running SparkPageRank example which uses cache() API for persistence. This AFAIK, uses MEMORY_ONLY storage level. But even in this setup, I see a lot of "WARN ExternalAppendOnlyMap: Spilling in-memory map of" messages in the log. Why is it so? I thought that MEMORY_ONLY means kick out the RDD if there isn't enough memory available. Thanks, Lokesh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spilling-in-memory-messages-in-log-even-with-MEMORY-ONLY-tp10723.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
graphx cached partitions wont go away
i have graphx queries running inside a service where i collect the results to the driver and do not hold any references to the rdds involved in the queries. my assumption was that with the references gone spark would go and remove the cached rdds from memory (note, i did not cache them, graphx did). yet they hang around... is my understanding of how the ContextCleaner works incorrect? or could it be that grapx holds some references internally to rdds, preventing garbage collection? maybe even circular references?
Re: graphx cached partitions wont go away
never mind I think its just the GC taking its time while I got many gigabytes of unused cached rdds that I cannot get rid of easily On Jul 26, 2014 4:44 PM, "Koert Kuipers" wrote: > i have graphx queries running inside a service where i collect the results > to the driver and do not hold any references to the rdds involved in the > queries. my assumption was that with the references gone spark would go and > remove the cached rdds from memory (note, i did not cache them, graphx did). > > yet they hang around... > > is my understanding of how the ContextCleaner works incorrect? or could it > be that grapx holds some references internally to rdds, preventing garbage > collection? maybe even circular references? >
Re: "Spilling in-memory..." messages in log even with MEMORY_ONLY
These messages are actually not about spilling the RDD, they're about spilling intermediate state in a reduceByKey, groupBy or other operation whose state doesn't fit in memory. We have to do that in these cases to avoid going out of memory. You can minimize spilling by having more reduce tasks though, which will mean less data per task. Matei On Jul 26, 2014, at 1:22 PM, lokesh.gidra wrote: > Hello, > > I am running SparkPageRank example which uses cache() API for persistence. > This AFAIK, uses MEMORY_ONLY storage level. But even in this setup, I see a > lot of "WARN ExternalAppendOnlyMap: Spilling in-memory map of" messages > in the log. Why is it so? I thought that MEMORY_ONLY means kick out the RDD > if there isn't enough memory available. > > > Thanks, > Lokesh > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Spilling-in-memory-messages-in-log-even-with-MEMORY-ONLY-tp10723.html > Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: "Spilling in-memory..." messages in log even with MEMORY_ONLY
Thanks for the reply. I understand this now. But in another situation, when I use large heap size to avoid any spilling (I confirm, there are no spilling messages in log), I still see a lot of time being spent in writeObject0() function. Can you please tell me why would there be any serialization done? Thanks Lokesh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spilling-in-memory-messages-in-log-even-with-MEMORY-ONLY-tp10723p10727.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Help using streaming from Spark Shell
You could also set the property using plain old System.setProperty("spark.cleaner.ttl", "3600") before creating the StreamingContext in the spark shell TD On Sat, Jul 26, 2014 at 7:50 AM, Yana Kadiyska wrote: > Hi, > > I'm starting spark-shell like this: > > SPARK_MEM=1g SPARK_JAVA_OPTS="-Dspark.cleaner.ttl=3600" > /spark/bin/spark-shell -c 3 > > but when I try to create a streaming context > val scc = new StreamingContext(sc, Seconds(10)) > > I get: > > org.apache.spark.SparkException: Spark Streaming cannot be used > without setting spark.cleaner.ttl; set this property before creating a > SparkContext creating a SparkContext (use SPARK_JAVA_OPTS for the > shell) > > at > org.apache.spark.streaming.StreamingContext.(StreamingContext.scala:121) > > > I also tried export SPARK_JAVA_OPTS="-Dspark.cleaner.ttl=3600" > before calling spark-shell but with no luck... > > What am I doing wrong? This is spark 0.9.1 -- I cannot upgrade
Re: "spark.streaming.unpersist" and "spark.cleaner.ttl"
Yeah, I wrote those lines a while back, I wanted to contrast storage levels with and without serialization. Should have realized that StorageLevel.MEMORY_ONLY_SER can be confused to be the default level. TD On Wed, Jul 23, 2014 at 5:12 AM, Shao, Saisai wrote: > Yeah, the document may not be precisely aligned with latest code, so the best > way is to check the code. > > -Original Message- > From: Haopu Wang [mailto:hw...@qilinsoft.com] > Sent: Wednesday, July 23, 2014 5:56 PM > To: user@spark.apache.org > Subject: RE: "spark.streaming.unpersist" and "spark.cleaner.ttl" > > Jerry, thanks for the response. > > For the default storage level of DStream, it looks like Spark's document is > wrong. In this link: > http://spark.apache.org/docs/latest/streaming-programming-guide.html#memory-tuning > It mentions: > "Default persistence level of DStreams: Unlike RDDs, the default persistence > level of DStreams serializes the data in memory (that is, > StorageLevel.MEMORY_ONLY_SER for DStream compared to StorageLevel.MEMORY_ONLY > for RDDs). Even though keeping the data serialized incurs higher > serialization/deserialization overheads, it significantly reduces GC pauses." > > I will take a look at DStream.scala although I have no Scala experience. > > -Original Message- > From: Shao, Saisai [mailto:saisai.s...@intel.com] > Sent: 2014年7月23日 15:13 > To: user@spark.apache.org > Subject: RE: "spark.streaming.unpersist" and "spark.cleaner.ttl" > > Hi Haopu, > > Please see the inline comments. > > Thanks > Jerry > > -Original Message- > From: Haopu Wang [mailto:hw...@qilinsoft.com] > Sent: Wednesday, July 23, 2014 3:00 PM > To: user@spark.apache.org > Subject: "spark.streaming.unpersist" and "spark.cleaner.ttl" > > I have a DStream receiving data from a socket. I'm using local mode. > I set "spark.streaming.unpersist" to "false" and leave " > spark.cleaner.ttl" to be infinite. > I can see files for input and shuffle blocks under "spark.local.dir" > folder and the size of folder keeps increasing, although JVM's memory usage > seems to be stable. > > [question] In this case, because input RDDs are persisted but they don't fit > into memory, so write to disk, right? And where can I see the details about > these RDDs? I don't see them in web UI. > > [answer] Yes, if memory is not enough to put input RDDs, this data will be > flush to disk, because the default storage level is "MEMORY_AND_DISK_SER_2" > as you can see in StreamingContext.scala. Actually you cannot not see the > input RDD in web UI, you can only see the cached RDD in web UI. > > Then I set "spark.streaming.unpersist" to "true", the size of > "spark.local.dir" folder and JVM's used heap size are reduced regularly. > > [question] In this case, because I didn't change "spark.cleaner.ttl", which > component is doing the cleanup? And what's the difference if I set > "spark.cleaner.ttl" to some duration in this case? > > [answer] If you set "spark.streaming.unpersist" to true, old unused rdd will > be deleted, as you can see in DStream.scala. While "spark.cleaner.ttl" is > timer-based spark cleaner, not only clean streaming data, but also broadcast, > shuffle and other data. > > Thank you! >
Re: "Spilling in-memory..." messages in log even with MEMORY_ONLY
Even in local mode, Spark serializes data that would be sent across the network, e.g. in a reduce operation, so that you can catch errors that would happen in distributed mode. You can make serialization much faster by using the Kryo serializer; see http://spark.apache.org/docs/latest/tuning.html. But it won't go away. Basically the code is not optimized for the very best performance on a single node, it's designed to make it easy to build your program locally and run it on a cluster without surprises. Matei On Jul 26, 2014, at 3:08 PM, lokesh.gidra wrote: > Thanks for the reply. I understand this now. > > But in another situation, when I use large heap size to avoid any spilling > (I confirm, there are no spilling messages in log), I still see a lot of > time being spent in writeObject0() function. Can you please tell me why > would there be any serialization done? > > > Thanks > Lokesh > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Spilling-in-memory-messages-in-log-even-with-MEMORY-ONLY-tp10723p10727.html > Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: "Spilling in-memory..." messages in log even with MEMORY_ONLY
Even in local mode, Spark serializes data that would be sent across the network, e.g. in a reduce operation, so that you can catch errors that would happen in distributed mode. You can make serialization much faster by using the Kryo serializer; see http://spark.apache.org/docs/latest/tuning.html. But it won't go away. Basically the code is not optimized for the very best performance on a single node, it's designed to make it easy to build your program locally and run it on a cluster without surprises. Matei On Jul 26, 2014, at 3:08 PM, lokesh.gidra wrote: > Thanks for the reply. I understand this now. > > But in another situation, when I use large heap size to avoid any spilling > (I confirm, there are no spilling messages in log), I still see a lot of > time being spent in writeObject0() function. Can you please tell me why > would there be any serialization done? > > > Thanks > Lokesh > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Spilling-in-memory-messages-in-log-even-with-MEMORY-ONLY-tp10723p10727.html > Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: streaming window not behaving as advertised (v1.0.1)
Yeah, maybe I should bump the issue to major. Now that I thought about to give my previous answer, this should be easy to fix just by doing a foreachRDD on all the input streams within the system (rather than explicitly doing it like I asked you to do). Thanks Alan, for testing this out and confirming that this was the same issue. I was worried that this is a totally new issue that we did not know of. TD On Wed, Jul 23, 2014 at 12:37 AM, Alan Ngai wrote: > TD, it looks like your instincts were correct. I misunderstood what you > meant. If I force an eval on the inputstream using foreachRDD, the > windowing will work correctly. If I don’t do that, lazy eval somehow screws > with window batches I eventually receive. Any reason the bug is categorized > as minor? It seems that anyone who uses the windowing functionality would > run into this bug. I imagine this would include anyone who wants to use > spark streaming to aggregate data in fixed time batches, which seems like a > fairly common use case. > > Alan > > > > On Jul 22, 2014, at 11:30 PM, Alan Ngai wrote: > > foreachRDD is how I extracted values in the first place, so that’s not going > to make a difference. I don’t think it’s related to SPARK-1312 because I’m > generating data every second in the first place and I’m using foreachRDD > right after the window operation. The code looks something like > > val batchInterval = 5 > val windowInterval = 25 > val slideInterval = 15 > > val windowedStream = inputStream.window(Seconds(windowInterval), > Seconds(slideInterval)) > > val outputFunc = (r: RDD[MetricEvent], t: Time) => { > println(" > %s".format(t.milliseconds / 1000)) > r.foreach{metric => > val timeKey = metric.timeStamp / batchInterval * batchInterval > println("%s %s %s %s".format(timeKey, metric.timeStamp, metric.name, > metric.value)) > } > } > testWindow.foreachRDD(outputFunc) > > On Jul 22, 2014, at 10:13 PM, Tathagata Das > wrote: > > It could be related to this bug that is currently open. > https://issues.apache.org/jira/browse/SPARK-1312 > > Here is a workaround. Can you put a inputStream.foreachRDD(rdd => { }) and > try these combos again? > > TD > > > On Tue, Jul 22, 2014 at 6:01 PM, Alan Ngai wrote: >> >> I have a sample application pumping out records 1 per second. The batch >> interval is set to 5 seconds. Here’s a list of “observed window intervals” >> vs what was actually set >> >> window=25, slide=25 : observed-window=25, overlapped-batches=0 >> window=25, slide=20 : observed-window=20, overlapped-batches=0 >> window=25, slide=15 : observed-window=15, overlapped-batches=0 >> window=25, slide=10 : observed-window=20, overlapped-batches=2 >> window=25, slide=5 : observed-window=25, overlapped-batches=3 >> >> can someone explain this behavior to me? I’m trying to aggregate metrics >> by time batches, but want to skip partial batches. Therefore, I’m trying to >> find a combination which results in 1 overlapped batch, but no combination I >> tried gets me there. >> >> Alan >> > > >
Re: SparkContext startup time out
I am bumping into this problem as well. I am trying to move to akka 2.3.x from 2.2.x in order to port to Scala 2.11 - only akka 2.3.x is available in Scala 2.11. All 2.2.x akka works fine, and all 2.3.x akka give the following exception in "new SparkContext". Still investigating why.. java.util.concurrent.TimeoutException: Futures timed out after [1 milliseconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at akka.remote.Remoting.start(Remoting.scala:180) at akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184) at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:618) at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:615) at akka.actor.ActorSystemImpl._start(ActorSystem.scala:615) On Fri, May 30, 2014 at 6:33 AM, Pierre B < pierre.borckm...@realimpactanalytics.com> wrote: > I was annoyed by this as well. > It appears that just permuting the order of decencies inclusion solves this > problem: > > first spark, than your cdh hadoop distro. > > HTH, > > Pierre > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/SparkContext-startup-time-out-tp1753p6582.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. >
Re: java.lang.StackOverflowError when calling count()
Responses inline. On Wed, Jul 23, 2014 at 4:13 AM, lalit1303 wrote: > Hi, > Thanks TD for your reply. I am still not able to resolve the problem for my > use case. > I have let's say 1000 different RDD's, and I am applying a transformation > function on each RDD and I want the output of all rdd's combined to a single > output RDD. For, this I am doing the following: > > ** > tempRDD = jaRDD.rdd().repartition(1).mapPartitions().toJavaRDD(); > *//creating new rdd in every loop* > outRDD = outRDD.union(tempRDD); *//keep joining RDD's to get the output into > a single RDD* > > *//after every 10 iteration, in order to truncate the lineage* > cachRDD = outRDD.cache(); > cachRDD.checkpoint(); > System.out.println(cachRDD.collect().size()); > outRDD = new JavaRDD(cachRDD.rdd(),cachRDD.classTag()); > ** > > *//finally after whole computation* > outRDD.saveAsTextFile(..) > > The above operations is overall slow, runs successfully when performed less > iterations i.e. ~100. But, when the num of iterations in increased to ~1000, > The whole job is taking more than *30 mins* and ultimately break down giving > OutOfMemory error. The total size of data is around 1.4 MB. As of now, I am > running the job on spark standalone mode with 2 cores and 2.9 GB memory. I think this is happening because how you are caching the output RDD that are being generated repeatedly. In every iteration, it is building this new union RDD which contains the data of the previous union RDD plus some new data. Since each of these union RDDs are cached, the underlying data is being cached repeatedly. So the cached Iteration 1: union RDD: X MB Iteration 2: union RDD: 2X MB | Total size cached: 3X Iteration 3: union RDD: 3X MB | Total size cached: 6X MB Iteration 4: union RDD: 4X MB | Total size cached: 10X MB ... If you do the math, that is a quadratic increase in the size of the data being processed and cached, wrt the # iterations. This leads to both increase in run time and memory usage. > I also observed that when collect() operation is performed, the number of > tasks keeps on increasing as the loop proceeds, like on first collect() 22 > total task, then ~40 total tasks ... ~300 task for single collect. > Does this means that all the operations are repeatedly performed, and RDD > lineage is not broken?? > Same reason as above. Each union RDD is build by appending the partitions of the previous union RDD plus the new set of partitions (~22 partitions). So Nth union RDD has N * 22 partitions, hence that many tasks. You could change this by also doing repartitioning when you want to cache+checkpoint the union RDD (therefore, outRDD.repartition(100).cache().checkpoint().count()). And do you really need all the data to be collected at the driver? If you are doing the cachRDD.collect() just to forced the checkpoint, then use cachRDD.count() > > Can you please elaborate on the point from your last post i.e. how to > perform: "*Create a modified RDD R` which has the same data as RDD R but > does not have the lineage. This is done by creating a new BlockRDD using the > ids of blocks of data representing the in-memory R*" > Please refer to the lines in the function: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala#L74 What those lines do is save the data of the associated RDD to HDFS files, and then create a new CheckpointRDD from the same files.Then the dependency of the associated RDD is changed to use the new RDD. This truncates the lineage because the associated RDD's parent is not the new RDD which has a very short lineage (links to checkpoint files). And the previous dependencies (parent RDDs) are forgotten. This implementation can be modified by forcing the data of the associated RDD to be cached with StorageLevel.MEMORY_AND_DISK_2. And then instead of CheckpointRDD, you can create a new BlockRDD (using the names of the blocks that are used to cache the RDD), which is then set as the new dependency. This is definitely a behind-the-public API implementation, that is > > > - > Lalit Yadav > la...@sigmoidanalytics.com > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-StackOverflowError-when-calling-count-tp5649p10488.html > Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: streaming sequence files?
Which deployment environment are you running the streaming programs? Standalone? In that case you have to specify what is the max cores for each application, other all the cluster resources may get consumed by the application. http://spark.apache.org/docs/latest/spark-standalone.html TD On Thu, Jul 24, 2014 at 4:57 PM, Barnaby wrote: > I have the streaming program writing sequence files. I can find one of the > files and load it in the shell using: > > scala> val rdd = sc.sequenceFile[String, > Int]("tachyon://localhost:19998/files/WordCounts/20140724-213930") > 14/07/24 21:47:50 INFO storage.MemoryStore: ensureFreeSpace(32856) called > with curMem=0, maxMem=309225062 > 14/07/24 21:47:50 INFO storage.MemoryStore: Block broadcast_0 stored as > values to memory (estimated size 32.1 KB, free 294.9 MB) > rdd: org.apache.spark.rdd.RDD[(String, Int)] = MappedRDD[1] at sequenceFile > at :12 > > So I got some type information, seems good. > > It took a while to research but I got the following streaming code to > compile and run: > > val wordCounts = ssc.fileStream[String, Int, SequenceFileInputFormat[String, > Int]](args(0)) > > It works now and I offer this for reference to anybody else who may be > curious about saving sequence files and then streaming them back in. > > Question: > When running both streaming programs at the same time using spark-submit I > noticed that only one app would really run. To get the one app to continue I > had to stop the other app. Is there a way to get these running > simultaneously? > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/streaming-sequence-files-tp10557p10620.html > Sent from the Apache Spark User List mailing list archive at Nabble.com.
Spark MLlib vs BIDMach Benchmark
BIDMach is CPU and GPU-accelerated Machine Learning Library also from Berkeley. https://github.com/BIDData/BIDMach/wiki/Benchmarks They did benchmark against Spark 0.9, and they claimed that it's significantly faster than Spark MLlib. In Spark 1.0, lot of performance optimization had been done, and sparse data is supported. It will be interesting to see new benchmark result. Anyone familiar with BIDMach? Are they as fast as they claim? Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai
Re: Spark MLlib vs BIDMach Benchmark
These numbers are from GPUs and Intel MKL (a closed-source math library for Intel processors), where for CPU-bound algorithms you are going to get faster speeds than MLlib's JBLAS. However, there's in theory nothing preventing the use of these in MLlib (e.g. if you have a faster BLAS locally; adding a GPU-based one would probably require bigger code changes). Some of the numbers there are also from more naive implementations of K-means and logistic regression in the Spark research paper, which include the fairly expensive cost of reading the data out of HDFS. On July 26, 2014 at 8:31:11 PM, DB Tsai (dbt...@dbtsai.com) wrote: BIDMach is CPU and GPU-accelerated Machine Learning Library also from Berkeley. https://github.com/BIDData/BIDMach/wiki/Benchmarks They did benchmark against Spark 0.9, and they claimed that it's significantly faster than Spark MLlib. In Spark 1.0, lot of performance optimization had been done, and sparse data is supported. It will be interesting to see new benchmark result. Anyone familiar with BIDMach? Are they as fast as they claim? Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai
Re: Spark MLlib vs BIDMach Benchmark
BTW I should add that one other thing that would help MLlib locally would be doing model updates in batches. That is, instead of operating on one point at a time, group together a bunch of them and apply a matrix operation, which will allow more efficient use of BLAS or other linear algebra primitives. We don't do a lot of this yet, but there was a project in the AMPLab to do more of it. Multiple models can also be trained simultaneously with this approach. On July 26, 2014 at 11:21:17 PM, Matei Zaharia (matei.zaha...@gmail.com) wrote: These numbers are from GPUs and Intel MKL (a closed-source math library for Intel processors), where for CPU-bound algorithms you are going to get faster speeds than MLlib's JBLAS. However, there's in theory nothing preventing the use of these in MLlib (e.g. if you have a faster BLAS locally; adding a GPU-based one would probably require bigger code changes). Some of the numbers there are also from more naive implementations of K-means and logistic regression in the Spark research paper, which include the fairly expensive cost of reading the data out of HDFS. On July 26, 2014 at 8:31:11 PM, DB Tsai (dbt...@dbtsai.com) wrote: BIDMach is CPU and GPU-accelerated Machine Learning Library also from Berkeley. https://github.com/BIDData/BIDMach/wiki/Benchmarks They did benchmark against Spark 0.9, and they claimed that it's significantly faster than Spark MLlib. In Spark 1.0, lot of performance optimization had been done, and sparse data is supported. It will be interesting to see new benchmark result. Anyone familiar with BIDMach? Are they as fast as they claim? Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai