Re: mapPartitionsWithIndex

2014-07-14 Thread Xiangrui Meng
You should return an iterator in mapPartitionsWIthIndex. This is from
the programming guide
(http://spark.apache.org/docs/latest/programming-guide.html):

mapPartitionsWithIndex(func): Similar to mapPartitions, but also
provides func with an integer value representing the index of the
partition, so func must be of type (Int, Iterator) => Iterator
when running on an RDD of type T.

For your case, try something similar to the following:

val keyval=dRDD.mapPartitionsWithIndex { (ind,iter) =>
  iter.map(x => process(ind,x.trim().split(' ').map(_.toDouble),q,m,r))
}

-Xiangrui

On Sun, Jul 13, 2014 at 11:26 PM, Madhura  wrote:
> I have a text file consisting of a large number of random floating values
> separated by spaces. I am loading this file into a RDD in scala.
>
> I have heard of mapPartitionsWithIndex but I haven't been able to implement
> it. For each partition I want to call a method(process in this case) to
> which I want to pass the partition and it's respective index as parameters.
>
> My method returns a pair of values.
> This is what I have done.
>
> val dRDD = sc.textFile("hdfs://master:54310/Data/input*")
> var ind:Int=0
> val keyval= dRDD.mapPartitionsWithIndex((ind,x) => process(ind,x,...))
> val res=keyval.collect()
>
> We are not able to access res(0)._1 and res(0)._2
>
> The error log is as follows.
>
> [error] SimpleApp.scala:420: value trim is not a member of Iterator[String]
> [error] Error occurred in an application involving default arguments.
> [error] val keyval=dRDD.mapPartitionsWithIndex( (ind,x) =>
> process(ind,x.trim().split(' ').map(_.toDouble),q,m,r))
> [error]
> ^
> [error] SimpleApp.scala:425: value mkString is not a member of
> Array[Nothing]
> [error]   println(res.mkString(""))
> [error]   ^
> [error] /SimpleApp.scala:427: value _1 is not a member of Nothing
> [error]   var final= res(0)._1
> [error] ^
> [error] /home/madhura/DTWspark/src/main/scala/SimpleApp.scala:428: value _2
> is not a member of Nothing
> [error]   var final1 = res(0)._2 - m +1
> [error]  ^
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/mapPartitionsWithIndex-tp9590.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.


Error when testing with large sparse svm

2014-07-14 Thread crater
Hi,

I encounter an error when testing svm (example one) on very large sparse
data. The dataset I ran on was a toy dataset with only ten examples but 13
million sparse vector with a few thousands non-zero entries.

The errors is showing below. I am wondering is this a bug or I am missing
something?

14/07/13 23:59:44 INFO SecurityManager: Using Spark's default log4j profile:
org/apache/spark/log4j-defaults.properties
14/07/13 23:59:44 INFO SecurityManager: Changing view acls to: chengjie
14/07/13 23:59:44 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users with view permissions: Set(chengjie)
14/07/13 23:59:45 INFO Slf4jLogger: Slf4jLogger started
14/07/13 23:59:45 INFO Remoting: Starting remoting
14/07/13 23:59:45 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://spark@master:53173]
14/07/13 23:59:45 INFO Remoting: Remoting now listens on addresses:
[akka.tcp://spark@master:53173]
14/07/13 23:59:45 INFO SparkEnv: Registering MapOutputTracker
14/07/13 23:59:45 INFO SparkEnv: Registering BlockManagerMaster
14/07/13 23:59:45 INFO DiskBlockManager: Created local directory at
/tmp/spark-local-20140713235945-c78f
14/07/13 23:59:45 INFO MemoryStore: MemoryStore started with capacity 14.4
GB.
14/07/13 23:59:45 INFO ConnectionManager: Bound socket to port 37674 with id
= ConnectionManagerId(master,37674)
14/07/13 23:59:45 INFO BlockManagerMaster: Trying to register BlockManager
14/07/13 23:59:45 INFO BlockManagerInfo: Registering block manager
master:37674 with 14.4 GB RAM
14/07/13 23:59:45 INFO BlockManagerMaster: Registered BlockManager
14/07/13 23:59:45 INFO HttpServer: Starting HTTP Server
14/07/13 23:59:45 INFO HttpBroadcast: Broadcast server started at
http://10.10.255.128:41838
14/07/13 23:59:45 INFO HttpFileServer: HTTP File server directory is
/tmp/spark-ac459d4b-a3c4-4577-bad4-576ac427d0bf
14/07/13 23:59:45 INFO HttpServer: Starting HTTP Server
14/07/13 23:59:51 INFO SparkUI: Started SparkUI at http://master:4040
14/07/13 23:59:51 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
14/07/13 23:59:52 INFO EventLoggingListener: Logging events to
/tmp/spark-events/binaryclassification-with-params(hdfs---master-9001-splice.small,1,1.0,svm,l1,0.1)-1405317591776
14/07/13 23:59:52 INFO SparkContext: Added JAR
file:/home/chengjie/spark-1.0.1/examples/target/scala-2.10/spark-examples-1.0.1-hadoop2.3.0.jar
at http://10.10.255.128:54689/jars/spark-examples-1.0.1-hadoop2.3.0.jar with
timestamp 1405317592653
14/07/13 23:59:52 INFO AppClient$ClientActor: Connecting to master
spark://master:7077...
14/07/14 00:00:08 WARN TaskSchedulerImpl: Initial job has not accepted any
resources; check your cluster UI to ensure that workers are registered and
have sufficient memory
14/07/14 00:00:23 WARN TaskSchedulerImpl: Initial job has not accepted any
resources; check your cluster UI to ensure that workers are registered and
have sufficient memory
14/07/14 00:00:38 WARN TaskSchedulerImpl: Initial job has not accepted any
resources; check your cluster UI to ensure that workers are registered and
have sufficient memory
14/07/14 00:00:53 WARN TaskSchedulerImpl: Initial job has not accepted any
resources; check your cluster UI to ensure that workers are registered and
have sufficient memory
Training: 10
14/07/14 00:01:09 WARN BLAS: Failed to load implementation from:
com.github.fommil.netlib.NativeSystemBLAS
14/07/14 00:01:09 WARN BLAS: Failed to load implementation from:
com.github.fommil.netlib.NativeRefBLAS
*Exception in thread "main" org.apache.spark.SparkException: Job aborted due
to stage failure: Serialized task 20:0 was 94453098 bytes which exceeds
spark.akka.frameSize (10485760 bytes). Consider using broadcast variables
for large values.*
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1044)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1028)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1026)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1026)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:634)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1229)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at ak

Re: Catalyst dependency on Spark Core

2014-07-14 Thread Yanbo Liang
Make Catalyst independent of Spark is the goal of Catalyst, maybe need time
and evolution.
I awared that package org.apache.spark.sql.catalyst.util
embraced org.apache.spark.util.{Utils => SparkUtils},
so that Catalyst has a dependency on Spark core.
I'm not sure whether it will be replaced by other component independent of
Spark in later release.


2014-07-14 11:51 GMT+08:00 Aniket Bhatnagar :

> As per the recent presentation given in Scala days (
> http://people.apache.org/~marmbrus/talks/SparkSQLScalaDays2014.pdf), it
> was mentioned that Catalyst is independent of Spark. But on inspecting
> pom.xml of sql/catalyst module, it seems it has a dependency on Spark Core.
> Any particular reason for the dependency? I would love to use Catalyst
> outside Spark
>
> (reposted as previous email bounced. Sorry if this is a duplicate).
>


Re: Supported SQL syntax in Spark SQL

2014-07-14 Thread Martin Gammelsæter
I am very interested in the original question as well, is there any
list (even if it is simply in the code) of all supported syntax for
Spark SQL?

On Mon, Jul 14, 2014 at 6:41 AM, Nicholas Chammas
 wrote:
>> Are you sure the code running on the cluster has been updated?
>
> I launched the cluster using spark-ec2 from the 1.0.1 release, so I’m
> assuming that’s taken care of, at least in theory.
>
> I just spun down the clusters I had up, but I will revisit this tomorrow and
> provide the information you requested.
>
> Nick



-- 
Mvh.
Martin Gammelsæter
92209139


Re: Error when testing with large sparse svm

2014-07-14 Thread Xiangrui Meng
You need to set a larger `spark.akka.frameSize`, e.g., 128, for the
serialized weight vector. There is a JIRA about switching
automatically between sending through akka or broadcast:
https://issues.apache.org/jira/browse/SPARK-2361 . -Xiangrui

On Mon, Jul 14, 2014 at 12:15 AM, crater  wrote:
> Hi,
>
> I encounter an error when testing svm (example one) on very large sparse
> data. The dataset I ran on was a toy dataset with only ten examples but 13
> million sparse vector with a few thousands non-zero entries.
>
> The errors is showing below. I am wondering is this a bug or I am missing
> something?
>
> 14/07/13 23:59:44 INFO SecurityManager: Using Spark's default log4j profile:
> org/apache/spark/log4j-defaults.properties
> 14/07/13 23:59:44 INFO SecurityManager: Changing view acls to: chengjie
> 14/07/13 23:59:44 INFO SecurityManager: SecurityManager: authentication
> disabled; ui acls disabled; users with view permissions: Set(chengjie)
> 14/07/13 23:59:45 INFO Slf4jLogger: Slf4jLogger started
> 14/07/13 23:59:45 INFO Remoting: Starting remoting
> 14/07/13 23:59:45 INFO Remoting: Remoting started; listening on addresses
> :[akka.tcp://spark@master:53173]
> 14/07/13 23:59:45 INFO Remoting: Remoting now listens on addresses:
> [akka.tcp://spark@master:53173]
> 14/07/13 23:59:45 INFO SparkEnv: Registering MapOutputTracker
> 14/07/13 23:59:45 INFO SparkEnv: Registering BlockManagerMaster
> 14/07/13 23:59:45 INFO DiskBlockManager: Created local directory at
> /tmp/spark-local-20140713235945-c78f
> 14/07/13 23:59:45 INFO MemoryStore: MemoryStore started with capacity 14.4
> GB.
> 14/07/13 23:59:45 INFO ConnectionManager: Bound socket to port 37674 with id
> = ConnectionManagerId(master,37674)
> 14/07/13 23:59:45 INFO BlockManagerMaster: Trying to register BlockManager
> 14/07/13 23:59:45 INFO BlockManagerInfo: Registering block manager
> master:37674 with 14.4 GB RAM
> 14/07/13 23:59:45 INFO BlockManagerMaster: Registered BlockManager
> 14/07/13 23:59:45 INFO HttpServer: Starting HTTP Server
> 14/07/13 23:59:45 INFO HttpBroadcast: Broadcast server started at
> http://10.10.255.128:41838
> 14/07/13 23:59:45 INFO HttpFileServer: HTTP File server directory is
> /tmp/spark-ac459d4b-a3c4-4577-bad4-576ac427d0bf
> 14/07/13 23:59:45 INFO HttpServer: Starting HTTP Server
> 14/07/13 23:59:51 INFO SparkUI: Started SparkUI at http://master:4040
> 14/07/13 23:59:51 WARN NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
> 14/07/13 23:59:52 INFO EventLoggingListener: Logging events to
> /tmp/spark-events/binaryclassification-with-params(hdfs---master-9001-splice.small,1,1.0,svm,l1,0.1)-1405317591776
> 14/07/13 23:59:52 INFO SparkContext: Added JAR
> file:/home/chengjie/spark-1.0.1/examples/target/scala-2.10/spark-examples-1.0.1-hadoop2.3.0.jar
> at http://10.10.255.128:54689/jars/spark-examples-1.0.1-hadoop2.3.0.jar with
> timestamp 1405317592653
> 14/07/13 23:59:52 INFO AppClient$ClientActor: Connecting to master
> spark://master:7077...
> 14/07/14 00:00:08 WARN TaskSchedulerImpl: Initial job has not accepted any
> resources; check your cluster UI to ensure that workers are registered and
> have sufficient memory
> 14/07/14 00:00:23 WARN TaskSchedulerImpl: Initial job has not accepted any
> resources; check your cluster UI to ensure that workers are registered and
> have sufficient memory
> 14/07/14 00:00:38 WARN TaskSchedulerImpl: Initial job has not accepted any
> resources; check your cluster UI to ensure that workers are registered and
> have sufficient memory
> 14/07/14 00:00:53 WARN TaskSchedulerImpl: Initial job has not accepted any
> resources; check your cluster UI to ensure that workers are registered and
> have sufficient memory
> Training: 10
> 14/07/14 00:01:09 WARN BLAS: Failed to load implementation from:
> com.github.fommil.netlib.NativeSystemBLAS
> 14/07/14 00:01:09 WARN BLAS: Failed to load implementation from:
> com.github.fommil.netlib.NativeRefBLAS
> *Exception in thread "main" org.apache.spark.SparkException: Job aborted due
> to stage failure: Serialized task 20:0 was 94453098 bytes which exceeds
> spark.akka.frameSize (10485760 bytes). Consider using broadcast variables
> for large values.*
> at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1044)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1028)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1026)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1026)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634)
>

Re: Graphx traversal and merge interesting edges

2014-07-14 Thread HHB
Hi Ankur,

FYI - in a naive attempt to enhance your solution, managed to create 
MergePatternPath. I think it works in expected way (atleast for the traversing 
problem in last email). 

I modified your code a bit. Also instead of EdgePattern I used List of 
Functions that match the whole edge triplets along the path... and it returns a 
*new Graph* which preserves the vertices attributes, but only with new merged 
edges.

MergePatternPath:
https://github.com/hihellobolke/spark/blob/graphx-traversal/graphx/src/main/scala/org/apache/spark/graphx/lib/MergePatternPath.scala

Here's a Gist of how I was using it:
https://gist.github.com/hihellobolke/c8e6c97cefed714258ad

This prolly is very naive attempt :-). Is there any possibility of adding it to 
the graphx.lib albeit one which is sophisticated & performant?

Thanks

On 08-Jul-2014, at 4:57 pm, HHB  wrote:

> Hi Ankur,
> 
> I was trying out the PatterMatcher it works for smaller path, but I see that 
> for the longer ones it continues to run forever...
> 
> Here's what I am trying: 
> https://gist.github.com/hihellobolke/dd2dc0fcebba485975d1  (The example of 3 
> share traders transacting in appl shares)
> 
> The first edge pattern list (Line 66) works okay, but the second one (Line 
> 76) never return..
> 
> Thanks,
> Gautam
> 
> 
> On 05-Jul-2014, at 3:23 pm, Ankur Dave  wrote:
> 
>> Interesting problem! My understanding is that you want to (1) find paths 
>> matching a particular pattern, and (2) add edges between the start and end 
>> vertices of the matched paths.
>> 
>> For (1), I implemented a pattern matcher for GraphX that iteratively 
>> accumulates partial pattern matches. I used your example in the unit test.
>> 
>> For (2), you can take the output of the pattern matcher (the set of matching 
>> paths organized by their terminal vertices) and construct a set of new edges 
>> using the initial and terminal vertices of each path. Then you can make a 
>> new graph consisting of the union of the original edge set and the new 
>> edges. Let me know if you'd like help with this.
>> 
>> Ankur
>> 
> 



Re: mapPartitionsWithIndex

2014-07-14 Thread Madhura
It worked! I was struggling for a week. Thanks a lot!


On Mon, Jul 14, 2014 at 12:31 PM, Xiangrui Meng [via Apache Spark User
List]  wrote:

> You should return an iterator in mapPartitionsWIthIndex. This is from
> the programming guide
> (http://spark.apache.org/docs/latest/programming-guide.html):
>
> mapPartitionsWithIndex(func): Similar to mapPartitions, but also
> provides func with an integer value representing the index of the
> partition, so func must be of type (Int, Iterator) => Iterator
> when running on an RDD of type T.
>
> For your case, try something similar to the following:
>
> val keyval=dRDD.mapPartitionsWithIndex { (ind,iter) =>
>   iter.map(x => process(ind,x.trim().split(' ').map(_.toDouble),q,m,r))
> }
>
> -Xiangrui
>
> On Sun, Jul 13, 2014 at 11:26 PM, Madhura <[hidden email]
> > wrote:
>
> > I have a text file consisting of a large number of random floating
> values
> > separated by spaces. I am loading this file into a RDD in scala.
> >
> > I have heard of mapPartitionsWithIndex but I haven't been able to
> implement
> > it. For each partition I want to call a method(process in this case) to
> > which I want to pass the partition and it's respective index as
> parameters.
> >
> > My method returns a pair of values.
> > This is what I have done.
> >
> > val dRDD = sc.textFile("hdfs://master:54310/Data/input*")
> > var ind:Int=0
> > val keyval= dRDD.mapPartitionsWithIndex((ind,x) => process(ind,x,...))
> > val res=keyval.collect()
> >
> > We are not able to access res(0)._1 and res(0)._2
> >
> > The error log is as follows.
> >
> > [error] SimpleApp.scala:420: value trim is not a member of
> Iterator[String]
> > [error] Error occurred in an application involving default arguments.
> > [error] val keyval=dRDD.mapPartitionsWithIndex( (ind,x) =>
> > process(ind,x.trim().split(' ').map(_.toDouble),q,m,r))
> > [error]
> > ^
> > [error] SimpleApp.scala:425: value mkString is not a member of
> > Array[Nothing]
> > [error]   println(res.mkString(""))
> > [error]   ^
> > [error] /SimpleApp.scala:427: value _1 is not a member of Nothing
> > [error]   var final= res(0)._1
> > [error] ^
> > [error] /home/madhura/DTWspark/src/main/scala/SimpleApp.scala:428: value
> _2
> > is not a member of Nothing
> > [error]   var final1 = res(0)._2 - m +1
> > [error]  ^
> >
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/mapPartitionsWithIndex-tp9590.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
>
> --
>  If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/mapPartitionsWithIndex-tp9590p9591.html
>  To unsubscribe from mapPartitionsWithIndex, click here
> 
> .
> NAML
> 
>




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/mapPartitionsWithIndex-tp9590p9598.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

spark1.0.1 catalyst transform filter not push down

2014-07-14 Thread victor sheng
Hi, I encountered a weird problem in spark sql.
I use sbt/sbt hive/console  to go into the shell.

I test the filter push down by using catalyst.

scala>  val queryPlan = sql("select value from (select key,value from src)a
where a.key=86 ")
scala> queryPlan.baseLogicalPlan
res0: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = 
Project ['value]
 Filter ('a.key = 86)
  Subquery a
   Project ['key,'value]
UnresolvedRelation None, src, None

I want to achieve the "Filter Push Down".

So I run :
scala> var newQuery = queryPlan.baseLogicalPlan transform {
 | case f @ Filter(_, p @ Project(_,grandChild)) 
 | if (f.references subsetOf grandChild.output) => 
 | p.copy(child = f.copy(child = grandChild))
 | }
:42: error: type mismatch;
 found   : Seq[org.apache.spark.sql.catalyst.expressions.Attribute]
 required:
scala.collection.GenSet[org.apache.spark.sql.catalyst.expressions.Attribute]
   if (f.references subsetOf grandChild.output) => 
^
It throws exception above. I don't know what's wrong.

If I run :
var newQuery = queryPlan.baseLogicalPlan transform {
case f @ Filter(_, p @ Project(_,grandChild)) 
if true => 
p.copy(child = f.copy(child = grandChild))
}
scala> var newQuery = queryPlan.baseLogicalPlan transform {
 | case f @ Filter(_, p @ Project(_,grandChild)) 
 | if true => 
 | p.copy(child = f.copy(child = grandChild))
 | }
newQuery: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = 
Project ['value]
 Filter ('a.key = 86)
  Subquery a
   Project ['key,'value]
UnresolvedRelation None, src, None

It seems the Filter also in the same position, not switch the order.
Can anyone guide me about it?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark1-0-1-catalyst-transform-filter-not-push-down-tp9599.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


sbt + idea + test

2014-07-14 Thread boci
Hi guys,


I want to use Elasticsearch and HBase in my spark project, I want to create
a test. I pulled up ES and Zookeeper, but if I put "val htest = new
HBaseTestingUtility()" to my app I got a strange exception (compilation
time, not runtime).

https://gist.github.com/b0c1/4a4b3f6350816090c3b5

Any idea?

--
Skype: boci13, Hangout: boci.b...@gmail.com


Running Spark on Microsoft Azure HDInsight

2014-07-14 Thread Niek Tax
Hi everyone,

Currently I am working on parallelizing a machine learning algorithm using
a Microsoft HDInsight cluster. I tried running my algorithm on Hadoop
MapReduce, but since my algorithm is iterative the job scheduling overhead
and data loading overhead severely limits the performance of my algorithm
in terms of training time.

Since recently, HDInsight supports Hadoop 2 with YARN, which I thought
would allow me to use run Spark jobs, which seem more fitting for my task. So
far I have not been able however to find how I can run Apache Spark jobs on
a HDInsight cluster.

It seems like remote job submission (which would have my preference) is not
possible for Spark on HDInsight, as REST endpoints for Oozie and templeton
do not seem to support submission of Spark jobs. I also tried to RDP to the
headnode for job submission from the headnode. On the headnode drives I can
find other new YARN computation models like Tez and I also managed to run
Tez jobs on it through YARN. However, Spark seems to be missing. Does this
mean that HDInsight currently does not support Spark, even though it
supports Hadoop versions with YARN? Or do I need to install Spark on the
HDInsight cluster first, in some way? Or is there maybe something else that
I'm missing and can I run Spark jobs on HDInsight some other way?

Many thanks in advance!


Kind regards,

Niek Tax


Spark SQL 1.0.1 error on reading fixed length byte array

2014-07-14 Thread Pei-Lun Lee
Hi,

I am using spark-sql 1.0.1 to load parquet files generated from method
described in:

https://gist.github.com/massie/7224868


When I try to submit a select query with columns of type fixed length byte
array, the following error pops up:


14/07/14 11:09:14 INFO scheduler.DAGScheduler: Failed to run take at
basicOperators.scala:100
org.apache.spark.SparkDriverExecutionException: Execution error
at
org.apache.spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.scala:581)
at
org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:559)
Caused by: parquet.io.ParquetDecodingException: Can not read value at 0 in
block -1 in file s3n://foo/bar/part-r-0.snappy.parquet
at
parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:177)
at
parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:130)
at
org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:122)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to
(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$27.apply(RDD.scala:989)
at org.apache.spark.rdd.RDD$$anonfun$27.apply(RDD.scala:989)
at
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083)
at
org.apache.spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.scala:574)
... 1 more
Caused by: java.lang.ClassCastException: Expected instance of primitive
converter but got
"org.apache.spark.sql.parquet.CatalystNativeArrayConverter"
at parquet.io.api.Converter.asPrimitiveConverter(Converter.java:30)
at
parquet.io.RecordReaderImplementation.(RecordReaderImplementation.java:264)
at
parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:60)
at
parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:74)
at
parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:110)
at
parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:172)
... 24 more


Is fixed length byte array supposed to work in this version? I noticed that
other array types like int or string already work.

Thanks,
--
Pei-Lun


Error in spark: Exception in thread "delete Spark temp dir"

2014-07-14 Thread Rahul Bhojwani
I am getting an error saying:

 Exception in thread "delete Spark temp dir
C:\Users\shawn\AppData\Local\Temp\spark-b4f1105c-d67b-488c-83f9-eff1d1b95786"
java.io.IOExcept
ion: Failed to delete:
C:\Users\shawn\AppData\Local\Temp\spark-b4f1105c-d67b-488c-83f9-eff1d1b95786\tmppr36zu
at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:483)
at
org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(Utils.scala:479)
at
org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(Utils.scala:478)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at
scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:478)
at org.apache.spark.util.Utils$$anon$4.run(Utils.scala:212)


Can anyone help me out of it? If the logs are required then I can forward
them?

-- 
Rahul K Bhojwani
3rd Year B.Tech
Computer Science and Engineering
National Institute of Technology, Karnataka


Re: Problem reading in LZO compressed files

2014-07-14 Thread Ognen Duzlevski
Nicholas, thanks nevertheless! I am going to spend some time to try and 
figure this out and report back :-)

Ognen

On 7/13/14, 7:05 PM, Nicholas Chammas wrote:


I actually never got this to work, which is part of the reason why I 
filed that JIRA. Apart from using |--jar| when starting the shell, I 
don’t have any more pointers for you. :(


​


On Sun, Jul 13, 2014 at 12:57 PM, Ognen Duzlevski 
mailto:ognen.duzlev...@gmail.com>> wrote:


Nicholas,

Thanks!

How do I make spark assemble against a local version of Hadoop?

I have 2.4.1 running on a test cluster and I did
"SPARK_HADOOP_VERSION=2.4.1 sbt/sbt assembly" but all it did was
pull in hadoop-2.4.1 dependencies via sbt (which is sufficient for
using a 2.4.1 HDFS). I am guessing my local version of Hadoop
libraries/jars is not used. Alternatively, how do I add the
hadoop-gpl-compression-0.1.0.jar (responsible for the lzo stuff)
to this hand assembled Spark?

I am running the spark-shell like this:
bin/spark-shell --jars
/home/ec2-user/hadoop/lib/hadoop-gpl-compression-0.1.0.jar

and getting this:

scala> val f =
sc.newAPIHadoopFile("hdfs://10.10.0.98:54310/data/1gram.lzo

",classOf[com.hadoop.mapreduce.LzoTextInputFormat],classOf[org.apache.hadoop.io.LongWritable],classOf[org.apache.hadoop.io.Text])
14/07/13 16:53:01 INFO MemoryStore: ensureFreeSpace(216014) called
with curMem=0, maxMem=311387750
14/07/13 16:53:01 INFO MemoryStore: Block broadcast_0 stored as
values to memory (estimated size 211.0 KB, free 296.8 MB)
f: org.apache.spark.rdd.RDD[(org.apache.hadoop.io.LongWritable,
org.apache.hadoop.io.Text)] = NewHadoopRDD[0] at newAPIHadoopFile
at :12

scala> f.take(1)
14/07/13 16:53:08 INFO FileInputFormat: Total input paths to
process : 1
java.lang.IncompatibleClassChangeError: Found interface
org.apache.hadoop.mapreduce.JobContext, but class was expected
at

com.hadoop.mapreduce.LzoTextInputFormat.listStatus(LzoTextInputFormat.java:67)

which makes me think something is not linked to something properly
(not a Java expert unfortunately).

Thanks!
Ognen



On 7/13/14, 10:35 AM, Nicholas Chammas wrote:


If you’re still seeing gibberish, it’s because Spark is not using
the LZO libraries properly. In your case, I believe you should be
calling |newAPIHadoopFile()| instead of |textFile()|.

For example:


|sc.newAPIHadoopFile("s3n://datasets.elasticmapreduce/ngrams/books/20090715/eng-us-all/1gram/data",
   classOf[com.hadoop.mapreduce.LzoTextInputFormat],
   classOf[org.apache.hadoop.io.LongWritable],
   classOf[org.apache.hadoop.io.Text])
|

On a side note, here’s a related JIRA issue: SPARK-2394: Make it
easier to read LZO-compressed files from EC2 clusters


Nick

​


On Sun, Jul 13, 2014 at 10:49 AM, Ognen Duzlevski
mailto:ognen.duzlev...@gmail.com>> wrote:

Hello,

I have been trying to play with the Google ngram dataset
provided by Amazon in form of LZO compressed files.

I am having trouble understanding what is going on ;). I have
added the compression jar and native library to the
underlying Hadoop/HDFS installation, restarted the name node
and the datanodes, Spark can obviously see the file but I get
gibberish on a read. Any ideas?

See output below:

14/07/13 14:39:19 INFO SparkContext: Added JAR
file:/home/ec2-user/hadoop/lib/hadoop-gpl-compression-0.1.0.jar
at
http://10.10.0.100:40100/jars/hadoop-gpl-compression-0.1.0.jar with
timestamp 1405262359777
14/07/13 14:39:20 INFO SparkILoop: Created spark context..
Spark context available as sc.

scala> val f =
sc.textFile("hdfs://10.10.0.98:54310/data/1gram.lzo
")
14/07/13 14:39:34 INFO MemoryStore: ensureFreeSpace(163793)
called with curMem=0, maxMem=311387750
14/07/13 14:39:34 INFO MemoryStore: Block broadcast_0 stored
as values to memory (estimated size 160.0 KB, free 296.8 MB)
f: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at
textFile at :12

scala> f.take(10)
14/07/13 14:39:43 INFO SparkContext: Job finished: take at
:15, took 0.419708348 s
res0: Array[String] =

Array(SEQ?!org.apache.hadoop.io.LongWritable?org.apache.hadoop.io.Text??#com.hadoop.compression.lzo.LzoCodec���\�??�?@�?A�?B�?C�?D�?E�?F�?G�?H�?I�?J�?K�?L�?M�?N�?O�?P�?Q�?R�?S�?T�?U�?V�?W�?X�?Y�?Z�?[�?\�?]�?^�?_�?`�?a�?b�?c�?d�?e�?f�?g�?h�?i�?j�?k�?l�?m�?n�?o�?p�?q�?r�?s�?t�?u�?v�?w�?x�?y�?z�?{�?|�?}�?~�?
�?��?��?��?��?��?��?��?��?��?��?��?��?��?��?��?��?��?...

Thanks!
Ognen









Can we get a spark context inside a mapper

2014-07-14 Thread Rahul Bhojwani
Hey, My question is for this situation:
Suppose we have 10 files each containing list of features in each row.

Task is that for each file cluster the features in that file and write the
corresponding cluster along with it in a new file.  So we have to generate
10 more files by applying clustering in each file individually.

So can I do it this way, that get rdd of list of files and apply map.
Inside the mapper function which will be handling each file, get another
spark context and use Mllib kmeans to get the clustered output file.

Please suggest the appropriate method to tackle this problem.

Thanks,
Rahul Kumar Bhojwani
3rd year, B.Tech
Computer Science Engineering
National Institute Of Technology, Karnataka
9945197359


Re: Spark Questions

2014-07-14 Thread Gonzalo Zarza
Thanks for your answers Shuo Xiang and Aaron Davidson!

Regards,


--
*Gonzalo Zarza* | PhD in High-Performance Computing | Big-Data Specialist |
*GLOBANT* | AR: +54 11 4109 1700 ext. 15494 | US: +1 877 215 5230 ext. 15494
 | [image: Facebook]  [image: Twitter]
 [image: Youtube]
 [image: Linkedin]
 [image: Pinterest]
 [image: Globant] 


On Sat, Jul 12, 2014 at 9:02 PM, Aaron Davidson  wrote:

> I am not entirely certain I understand your questions, but let me assume
> you are mostly interested in SparkSQL and are thinking about your problem
> in terms of SQL-like tables.
>
> 1. Shuo Xiang mentioned Spark partitioning strategies, but in case you are
> talking about data partitioning or sharding as exist in Hive, SparkSQL does
> not currently support this, though it is on the roadmap. We can read from
> partitioned Hive tables, however.
>
> 2. If by entries/record you mean something like columns/row, SparkSQL does
> allow you to project out the columns you want, or select all columns. The
> efficiency of such a projection is determined by the how the data is
> stored, however: If your data is stored in an inherently row-based format,
> this projection will be no faster than doing an initial map() over the data
> to only select the desired columns. If it's stored in something like
> Parquet, or cached in memory, however, we would avoid ever looking at the
> unused columns.
>
> 3. Spark has a very generalized data source API, so it is capable of
> interacting with whatever data source. However, I don't think we currently
> have any SparkSQL connectors to RDBMSes that would support column pruning
> or other push-downs. This is all very much viable, however.
>
>
> On Fri, Jul 11, 2014 at 1:35 PM, Gonzalo Zarza 
> wrote:
>
>> Hi all,
>>
>> We've been evaluating Spark for a long-term project. Although we've been
>> reading several topics in forum, any hints on the following topics we'll be
>> extremely welcomed:
>>
>> 1. Which are the data partition strategies available in Spark? How
>> configurable are these strategies?
>>
>> 2. How would be the best way to use Spark if queries can touch only 3-5
>> entries/records? Which strategy is the best if they want to perform a full
>> scan of the entries?
>>
>> 3. Is Spark capable of interacting with RDBMS?
>>
>> Thanks a lot!
>>
>> Best regards,
>>
>> --
>> *Gonzalo Zarza* | PhD in High-Performance Computing | Big-Data
>> Specialist |
>> *GLOBANT* | AR: +54 11 4109 1700 ext. 15494 | US: +1 877 215 5230 ext.
>> 15494 | [image: Facebook]  [image:
>> Twitter]  [image: Youtube]
>>  [image: Linkedin]
>>  [image: Pinterest]
>>  [image: Globant]
>> 
>>
>
>


Re: spark1.0.1 catalyst transform filter not push down

2014-07-14 Thread Yin Huai
Hi,

queryPlan.baseLogicalPlan is not the plan used to execution. Actually,
the baseLogicalPlan
of a SchemaRDD (queryPlan in your case) is just the parsed plan (the parsed
plan will be analyzed, and then optimized. Finally, a physical plan will be
created). The plan shows up after you execute "val queryPlan = sql("select
value from (select key,value from src)a where a.key=86 ")" is the physical
plan. Or, you can use queryPlan.queryExecution to see the Logical Plan,
Optimized Logical Plan, and Physical Plan. You can find the physical plan
is

== Physical Plan ==
Project [value#3:0]
 Filter (key#2:1 = 86)
  HiveTableScan [value#3,key#2], (MetastoreRelation default, src, None),
None

Thanks,

Yin



On Mon, Jul 14, 2014 at 3:42 AM, victor sheng 
wrote:

> Hi, I encountered a weird problem in spark sql.
> I use sbt/sbt hive/console  to go into the shell.
>
> I test the filter push down by using catalyst.
>
> scala>  val queryPlan = sql("select value from (select key,value from src)a
> where a.key=86 ")
> scala> queryPlan.baseLogicalPlan
> res0: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
> Project ['value]
>  Filter ('a.key = 86)
>   Subquery a
>Project ['key,'value]
> UnresolvedRelation None, src, None
>
> I want to achieve the "Filter Push Down".
>
> So I run :
> scala> var newQuery = queryPlan.baseLogicalPlan transform {
>  | case f @ Filter(_, p @ Project(_,grandChild))
>  | if (f.references subsetOf grandChild.output) =>
>  | p.copy(child = f.copy(child = grandChild))
>  | }
> :42: error: type mismatch;
>  found   : Seq[org.apache.spark.sql.catalyst.expressions.Attribute]
>  required:
>
> scala.collection.GenSet[org.apache.spark.sql.catalyst.expressions.Attribute]
>if (f.references subsetOf grandChild.output) =>
> ^
> It throws exception above. I don't know what's wrong.
>
> If I run :
> var newQuery = queryPlan.baseLogicalPlan transform {
> case f @ Filter(_, p @ Project(_,grandChild))
> if true =>
> p.copy(child = f.copy(child = grandChild))
> }
> scala> var newQuery = queryPlan.baseLogicalPlan transform {
>  | case f @ Filter(_, p @ Project(_,grandChild))
>  | if true =>
>  | p.copy(child = f.copy(child = grandChild))
>  | }
> newQuery: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
> Project ['value]
>  Filter ('a.key = 86)
>   Subquery a
>Project ['key,'value]
> UnresolvedRelation None, src, None
>
> It seems the Filter also in the same position, not switch the order.
> Can anyone guide me about it?
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/spark1-0-1-catalyst-transform-filter-not-push-down-tp9599.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: Running Spark on Microsoft Azure HDInsight

2014-07-14 Thread Marco Shaw
I'm a Spark and HDInsight novice, so I could be wrong...

HDInsight is based on HDP2, so my guess here is that you have the option of
installing/configuring Spark in cluster mode (YARN) or in standalone mode
and package the Spark binaries with your job.

Everything I seem to look at is related to UNIX shell scripts.  So, one
might need to pull apart some of these scripts to pick out how to run this
on Windows.

Interesting project...

Marco



On Mon, Jul 14, 2014 at 8:00 AM, Niek Tax  wrote:

> Hi everyone,
>
> Currently I am working on parallelizing a machine learning algorithm using
> a Microsoft HDInsight cluster. I tried running my algorithm on Hadoop
> MapReduce, but since my algorithm is iterative the job scheduling overhead
> and data loading overhead severely limits the performance of my algorithm
> in terms of training time.
>
> Since recently, HDInsight supports Hadoop 2 with YARN, which I thought
> would allow me to use run Spark jobs, which seem more fitting for my task. So
> far I have not been able however to find how I can run Apache Spark jobs on
> a HDInsight cluster.
>
> It seems like remote job submission (which would have my preference) is
> not possible for Spark on HDInsight, as REST endpoints for Oozie and
> templeton do not seem to support submission of Spark jobs. I also tried to
> RDP to the headnode for job submission from the headnode. On the headnode
> drives I can find other new YARN computation models like Tez and I also
> managed to run Tez jobs on it through YARN. However, Spark seems to be
> missing. Does this mean that HDInsight currently does not support Spark,
> even though it supports Hadoop versions with YARN? Or do I need to install
> Spark on the HDInsight cluster first, in some way? Or is there maybe
> something else that I'm missing and can I run Spark jobs on HDInsight some
> other way?
>
> Many thanks in advance!
>
>
> Kind regards,
>
> Niek Tax
>


Re: Announcing Spark 1.0.1

2014-07-14 Thread Philip Ogren

Hi Patrick,

This is great news but I nearly missed the announcement because it had 
scrolled off the folder view that I have Spark users list messages go 
to.  40+ new threads since you sent the email out on Friday evening.


You might consider having someone on your team create a 
spark-announcement list so that it is easier to disseminate important 
information like this release announcement.


Thanks again for all your hard work.  I know you and the rest of the 
team are getting a million requests a day


Philip


On 07/11/2014 07:35 PM, Patrick Wendell wrote:

I am happy to announce the availability of Spark 1.0.1! This release
includes contributions from 70 developers. Spark 1.0.0 includes fixes
across several areas of Spark, including the core API, PySpark, and
MLlib. It also includes new features in Spark's (alpha) SQL library,
including support for JSON data and performance and stability fixes.

Visit the release notes[1] to read about this release or download[2]
the release today.

[1] http://spark.apache.org/releases/spark-release-1-0-1.html
[2] http://spark.apache.org/downloads.html




Re: Running Spark on Microsoft Azure HDInsight

2014-07-14 Thread Marco Shaw
Looks like going with cluster mode is not a good idea:
http://azure.microsoft.com/en-us/documentation/articles/hdinsight-administer-use-management-portal/

Seems like a non-HDInsight VM might be needed to make it the Spark master
node.

Marco



On Mon, Jul 14, 2014 at 12:43 PM, Marco Shaw  wrote:

> I'm a Spark and HDInsight novice, so I could be wrong...
>
> HDInsight is based on HDP2, so my guess here is that you have the option
> of installing/configuring Spark in cluster mode (YARN) or in standalone
> mode and package the Spark binaries with your job.
>
> Everything I seem to look at is related to UNIX shell scripts.  So, one
> might need to pull apart some of these scripts to pick out how to run this
> on Windows.
>
> Interesting project...
>
> Marco
>
>
>
> On Mon, Jul 14, 2014 at 8:00 AM, Niek Tax  wrote:
>
>> Hi everyone,
>>
>> Currently I am working on parallelizing a machine learning algorithm
>> using a Microsoft HDInsight cluster. I tried running my algorithm on Hadoop
>> MapReduce, but since my algorithm is iterative the job scheduling overhead
>> and data loading overhead severely limits the performance of my algorithm
>> in terms of training time.
>>
>> Since recently, HDInsight supports Hadoop 2 with YARN, which I thought
>> would allow me to use run Spark jobs, which seem more fitting for my task. So
>> far I have not been able however to find how I can run Apache Spark jobs on
>> a HDInsight cluster.
>>
>> It seems like remote job submission (which would have my preference) is
>> not possible for Spark on HDInsight, as REST endpoints for Oozie and
>> templeton do not seem to support submission of Spark jobs. I also tried to
>> RDP to the headnode for job submission from the headnode. On the headnode
>> drives I can find other new YARN computation models like Tez and I also
>> managed to run Tez jobs on it through YARN. However, Spark seems to be
>> missing. Does this mean that HDInsight currently does not support Spark,
>> even though it supports Hadoop versions with YARN? Or do I need to install
>> Spark on the HDInsight cluster first, in some way? Or is there maybe
>> something else that I'm missing and can I run Spark jobs on HDInsight some
>> other way?
>>
>> Many thanks in advance!
>>
>>
>> Kind regards,
>>
>> Niek Tax
>>
>
>


RE: writing FLume data to HDFS

2014-07-14 Thread Sundaram, Muthu X.
I am not sure how to write it…I tried writing to local file system using 
FileWriter and Print Writer. I tried it inside the while loop. I am able to get 
the text and able to print it but it fails when I use regular java classes. 
Shouldn’t I use regular java classes here? Can I write to only HDFS? Should I 
have to create the file in HDFS using HDFS classes? I thought of using Spark’s 
SaveAsTextFile(). But I have JavaRDD of this..not 
JavaRDD. So I am not sure whether SaveAsText() will work. I 
appreciate any guidance here. How do I get more code examples? Books, URL?


  flumeStream.foreach(new Function,Void> () {
  @Override
  public Void call(JavaRDD eventsData) throws 
Exception {
 String logRecord = null;
 List events = eventsData.collect();
 Iterator batchedEvents = 
events.iterator();
 long t1 = System.currentTimeMillis();
 AvroFlumeEvent avroEvent = null;
 ByteBuffer bytePayload = null;
 // All the user level data is carried as payload in Flume 
Event
 while(batchedEvents.hasNext()) {
SparkFlumeEvent flumeEvent = batchedEvents.next();
avroEvent = flumeEvent.event();
bytePayload = avroEvent.getBody();
logRecord = new String(bytePayload.array());

System.out.println("LOG RECORD = " + 
logRecord);

   ??I was trying to write the data to hdfs..but it 
fails…


From: Tathagata Das [mailto:tathagata.das1...@gmail.com]
Sent: Friday, July 11, 2014 1:43 PM
To: user@spark.apache.org
Cc: u...@spark.incubator.apache.org
Subject: Re: writing FLume data to HDFS

What is the error you are getting when you say "??I was trying to write the 
data to hdfs..but it fails…"

TD

On Thu, Jul 10, 2014 at 1:36 PM, Sundaram, Muthu X. 
mailto:muthu.x.sundaram@sabre.com>> wrote:
I am new to spark. I am trying to do the following.
Netcat-->Flume-->Spark streaming(process Flume Data)-->HDFS.

My flume config file has following set up.

Source = netcat
Sink=avrosink.

Spark Streaming code:
I am able to print data from flume to the monitor. But I am struggling to 
create a file. In order to get the real data I need to convert SparkEvent to 
avroEvent.
JavaRDD.saveAsText()-->might not work..because JavaRDD is collection of 
SparkEvent..Do I need to convert this in to collection of JavaRDD?
Please share any code examples… Thanks.

Code:

 Duration batchInterval = new Duration(2000);
SparkConf sparkConf = new SparkConf().setAppName("JavaFlumeEventCount");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, 
batchInterval);
JavaDStream flumeStream = FlumeUtils.createStream(ssc, 
host, port);

flumeStream.count();
flumeStream.foreachRDD(new 
Function2,JavaRDD,Void>(){
 @Override
 public Void call(JavaRDD events1,JavaRDD 
events2) throws Exception{
events1.saveasTextFile("output.txt");
return null;
 }
 });

/*flumeStream.count().map(new Function() {
  @Override
  public String call(Long in) {
return "Received " + in + " flume events.";
  }
}).print();*/

flumeStream.foreach(new Function,Void> () {
  @Override
  public Void call(JavaRDD eventsData) throws 
Exception {
 String logRecord = null;
 List events = eventsData.collect();
 Iterator batchedEvents = 
events.iterator();


 long t1 = System.currentTimeMillis();
 AvroFlumeEvent avroEvent = null;
 ByteBuffer bytePayload = null;

 // All the user level data is carried as payload in Flume 
Event

 while(batchedEvents.hasNext()) {
SparkFlumeEvent flumeEvent = batchedEvents.next();
avroEvent = flumeEvent.event();
bytePayload = avroEvent.getBody();
logRecord = new String(bytePayload.array());

System.out.println("LOG RECORD = " + 
logRecord);

   ??I was trying to write the data to hdfs..but it 
fails…



 }
 System.out.println("Processed this batch in: " + 
(System.currentTimeMillis() - t1)/1000 + " seconds");
 return null;
  }
 });




Re: Potential bugs in SparkSQL

2014-07-14 Thread Yin Huai
I have opened https://issues.apache.org/jira/browse/SPARK-2474 to track
this bug. I will also explain my understanding of the root cause.


On Thu, Jul 10, 2014 at 6:03 PM, Michael Armbrust 
wrote:

> Hmm, yeah looks like the table name is not getting applied to the
> attributes of m.  You can work around this by rewriting your query as:
> hql("select s.id from (SELECT * FROM m) m join s on (s.id=m.id) order by
> s.id"
>
> This explicitly gives the alias m to the attributes of that table. You can
> also open a JIRA and we can look in to the root cause in more detail.
>
> Michael
>
>
> On Thu, Jul 10, 2014 at 5:45 PM, Jerry Lam  wrote:
>
>> Hi Michael,
>>
>> I got the log you asked for. Note that I manually edited the table name
>> and the field names to hide some sensitive information.
>>
>> == Logical Plan ==
>> Project ['s.id]
>>  Join Inner, Some((id#106 = 'm.id))
>>   Project [id#96 AS id#62]
>>MetastoreRelation test, m, None
>>   MetastoreRelation test, s, Some(s)
>>
>> == Optimized Logical Plan ==
>> Project ['s.id]
>>  Join Inner, Some((id#106 = 'm.id))
>>   Project []
>>MetastoreRelation test, m, None
>>   Project [id#106]
>>MetastoreRelation test, s, Some(s)
>>
>> == Physical Plan ==
>> Project ['s.id]
>>  Filter (id#106:0 = 'm.id)
>>   CartesianProduct
>>HiveTableScan [], (MetastoreRelation test, m, None), None
>>HiveTableScan [id#106], (MetastoreRelation test, s, Some(s)), None
>>
>> Best Regards,
>>
>> Jerry
>>
>>
>>
>> On Thu, Jul 10, 2014 at 7:16 PM, Michael Armbrust > > wrote:
>>
>>> Hi Jerry,
>>>
>>> Thanks for reporting this.  It would be helpful if you could provide the
>>> output of the following command:
>>>
>>> println(hql("select s.id from m join s on (s.id=m_id)").queryExecution)
>>>
>>> Michael
>>>
>>>
>>> On Thu, Jul 10, 2014 at 8:15 AM, Jerry Lam  wrote:
>>>
 Hi Spark developers,

 I have the following hqls that spark will throw exceptions of this kind:
 14/07/10 15:07:55 INFO TaskSetManager: Loss was due to
 org.apache.spark.TaskKilledException [duplicate 17]
 org.apache.spark.SparkException: Job aborted due to stage failure: Task
 0.0:736 failed 4 times, most recent failure: Exception failure in TID 167
 on host etl2-node05:
 org.apache.spark.sql.catalyst.errors.package$TreeNodeException: No function
 to evaluate expression. type: UnresolvedAttribute, tree: 'm.id

 org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.eval(unresolved.scala:59)

 org.apache.spark.sql.catalyst.expressions.Equals.eval(predicates.scala:151)

 org.apache.spark.sql.execution.Filter$$anonfun$2$$anonfun$apply$1.apply(basicOperators.scala:52)

 org.apache.spark.sql.execution.Filter$$anonfun$2$$anonfun$apply$1.apply(basicOperators.scala:52)
 scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390)
 scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 scala.collection.Iterator$class.foreach(Iterator.scala:727)
 scala.collection.AbstractIterator.foreach(Iterator.scala:1157)

 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)

 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)

 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
 scala.collection.TraversableOnce$class.to
 (TraversableOnce.scala:273)
 scala.collection.AbstractIterator.to(Iterator.scala:1157)

 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
 scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)

 scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
 scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
 org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:717)
 org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:717)

 org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1080)

 org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1080)

 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
 org.apache.spark.scheduler.Task.run(Task.scala:51)

 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)

 java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
 java.lang.Thread.run(Thread.java:662)

 The hql looks like this (I trimmed the hql down to the essentials to
 demonstrate the potential bugs, the actual join is more complex and
 irrelevant to the bug):

 val hiveContext = new org.apache.spark.sql.hive.HiveContext(

Re: Error when testing with large sparse svm

2014-07-14 Thread crater
Hi xiangrui,


Where can I set the "spark.akka.frameSize" ?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-testing-with-large-sparse-svm-tp9592p9616.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Error when testing with large sparse svm

2014-07-14 Thread Srikrishna S
If you use Scala, you can do:

  val conf = new SparkConf()
 .setMaster("yarn-client")
 .setAppName("Logistic regression SGD fixed")
 .set("spark.akka.frameSize", "100")
 .setExecutorEnv("SPARK_JAVA_OPTS", " -Dspark.akka.frameSize=100")
var sc = new SparkContext(conf)


I have been struggling with this too. I was trying to run Spark on the
KDDB website which has about 29M features. It implodes and dies. Let
me know if you are able to figure out how to get things to work well
on really really wide datasets.

Regards,
Krishna

On Mon, Jul 14, 2014 at 10:18 AM, crater  wrote:
> Hi xiangrui,
>
>
> Where can I set the "spark.akka.frameSize" ?
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-testing-with-large-sparse-svm-tp9592p9616.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.


Spark Streaming Json file groupby function

2014-07-14 Thread srinivas
hi 
  I am new to spark and scala and I am trying to do some aggregations on
json file stream using Spark Streaming. I am able to parse the json string
and it is converted to map(id -> 123, name -> srini, mobile -> 12324214,
score -> 123, test_type -> math) now i want to use GROUPBY function on each
student map data and wanted to do some aggregations on scores. Here is my
main function 
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("KafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(10))
   // ssc.checkpoint("checkpoint")

val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
 val lines = KafkaUtils.createStream(ssc, zkQuorum, group,
topicpMap).map(_._2)
 val jsonf =
lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[String,
Any]])

 
jsonf.print()

ssc.start()
ssc.awaitTermination()
  }

Can anyone please Let me know how to use groupby function..thanks 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Trouble with spark-ec2 script: --ebs-vol-size

2014-07-14 Thread Ben Horner
Hello,

I'm using the spark-0.9.1-bin-hadoop1 distribution, and the ec2/spark-ec2
script within it to spin up a cluster.  I tried running my processing just
using the default (ephemeral) HDFS configuration, but my job errored out,
saying that there was no space left.  So now I'm trying to increase the size
of HDFS on the cluster.

My launch command:
ec2/spark-ec2 -k ... -i ... -z us-east-1d -s 4 -t m3.2xlarge
--ebs-vol-size=250 -m r3.2xlarge launch ...

My understanding is that I should get a cluster, where each slave node has
an ebs backed drive with 250 GB of storage, with a persistent HDFS set to
use these slave drives.

I turn off the ephemeral HDFS on the cluster master:
ephemeral-hdfs/bin/stop-all.sh

Then I turn on the persistent HDFS on the cluster master:
persistent-hdfs/bin/start-all.sh

Once I discovered the proper URL to hit the persistent name node page (not
the ephemeral standard 50070 port):
http://:60070/dfshealth.jsp

The page shows 4 nodes as expected, but the configured capacity shows as
31.5 GB, not the expected 1 TB (250 GB x 4)


Please help!
Don't be shy to let me know if I've made mis-steps, or if I'm not
understanding things correctly!
Thanks.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Trouble-with-spark-ec2-script-ebs-vol-size-tp9619.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Supported SQL syntax in Spark SQL

2014-07-14 Thread Michael Armbrust
You can find the parser here:
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala

In general the hive parser provided by HQL is much more complete at the
moment.  Long term we will likely stop using parser combinators and either
write a more complete parser, or adopt one from an existing project.


On Mon, Jul 14, 2014 at 12:25 AM, Martin Gammelsæter <
martingammelsae...@gmail.com> wrote:

> I am very interested in the original question as well, is there any
> list (even if it is simply in the code) of all supported syntax for
> Spark SQL?
>
> On Mon, Jul 14, 2014 at 6:41 AM, Nicholas Chammas
>  wrote:
> >> Are you sure the code running on the cluster has been updated?
> >
> > I launched the cluster using spark-ec2 from the 1.0.1 release, so I’m
> > assuming that’s taken care of, at least in theory.
> >
> > I just spun down the clusters I had up, but I will revisit this tomorrow
> and
> > provide the information you requested.
> >
> > Nick
>
>
>
> --
> Mvh.
> Martin Gammelsæter
> 92209139
>


Gradient Boosted Machines

2014-07-14 Thread Daniel Bendavid
Hi,

My company is strongly considering implementing a recommendation engine that is 
built off of statistical models using Spark.  We attended the Spark Summit and 
were incredibly impressed with the technology and the entire community.  Since 
then, we have been exploring the technology and determining how we could use it 
for our specific needs.

One algorithm that we ideally want to use as part of our project is Gradient 
Boosted Machines.  We are aware that they have not yet been implemented in MLib 
and would like to submit our request that they be considered for future 
implementation.  Additionally, we would love to see the AdaBoost algorithm 
implemented in Mlib and Feature Preprocessing implemented in Python (as it 
already exists for Scala).

Otherwise, thank you for taking our feedback and for providing us with this 
incredible technology.

Daniel


Re: Can we get a spark context inside a mapper

2014-07-14 Thread Rahul Bhojwani
I understand that the question is very unprofessional, but I am a newbie.
If you could share some link where I can ask such questions, if not here.

But please answer.


On Mon, Jul 14, 2014 at 6:52 PM, Rahul Bhojwani  wrote:

> Hey, My question is for this situation:
> Suppose we have 10 files each containing list of features in each row.
>
> Task is that for each file cluster the features in that file and write the
> corresponding cluster along with it in a new file.  So we have to generate
> 10 more files by applying clustering in each file individually.
>
> So can I do it this way, that get rdd of list of files and apply map.
> Inside the mapper function which will be handling each file, get another
> spark context and use Mllib kmeans to get the clustered output file.
>
> Please suggest the appropriate method to tackle this problem.
>
> Thanks,
> Rahul Kumar Bhojwani
> 3rd year, B.Tech
> Computer Science Engineering
> National Institute Of Technology, Karnataka
> 9945197359
>



-- 
Rahul K Bhojwani
3rd Year B.Tech
Computer Science and Engineering
National Institute of Technology, Karnataka


Re: Error when testing with large sparse svm

2014-07-14 Thread crater
Hi Krishna,

Thanks for your help. Are you able to get your 29M data running yet? I fix
the previous problem by setting larger spark.akka.frameSize, but now I get
some other errors below. Did you get these errors before?


14/07/14 11:32:20 ERROR TaskSchedulerImpl: Lost executor 1 on node7: remote
Akka client disassociated
14/07/14 11:32:20 WARN TaskSetManager: Lost TID 20 (task 13.0:0)
14/07/14 11:32:21 ERROR TaskSchedulerImpl: Lost executor 3 on node8: remote
Akka client disassociated
14/07/14 11:32:21 WARN TaskSetManager: Lost TID 21 (task 13.0:1)
14/07/14 11:32:23 ERROR TaskSchedulerImpl: Lost executor 6 on node3: remote
Akka client disassociated
14/07/14 11:32:23 WARN TaskSetManager: Lost TID 22 (task 13.0:0)
14/07/14 11:32:25 ERROR TaskSchedulerImpl: Lost executor 0 on node4: remote
Akka client disassociated
14/07/14 11:32:25 WARN TaskSetManager: Lost TID 23 (task 13.0:1)
14/07/14 11:32:26 ERROR TaskSchedulerImpl: Lost executor 5 on node1: remote
Akka client disassociated
14/07/14 11:32:26 WARN TaskSetManager: Lost TID 24 (task 13.0:0)
14/07/14 11:32:28 ERROR TaskSchedulerImpl: Lost executor 7 on node6: remote
Akka client disassociated
14/07/14 11:32:28 WARN TaskSetManager: Lost TID 26 (task 13.0:0)
14/07/14 11:32:28 ERROR TaskSetManager: Task 13.0:0 failed 4 times; aborting
job
Exception in thread "main" org.apache.spark.SparkException: Job aborted due
to stage failure: Task 13.0:0 failed 4 times, most recent failure: TID 26 on
host node6 failed for unknown reason
Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1044)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1028)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1026)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1026)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:634)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1229)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-testing-with-large-sparse-svm-tp9592p9623.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Can we get a spark context inside a mapper

2014-07-14 Thread Daniel Siegmann
Rahul, I'm not sure what you mean by your question being "very
unprofessional". You can feel free to answer such questions here. You may
or may not receive an answer, and you shouldn't necessarily expect to have
your question answered within five hours.

I've never tried to do anything like your case. I imagine the easiest thing
would be to read and process each file individually, since you are
intending to produce a separate result for each. You could also look at
RDD.wholeTextFiles - maybe that will be of some use if your files are small
- but I don't know of any corresponding save method which would generate
files with different names from within a single RDD.


On Mon, Jul 14, 2014 at 2:30 PM, Rahul Bhojwani  wrote:

> I understand that the question is very unprofessional, but I am a newbie.
> If you could share some link where I can ask such questions, if not here.
>
> But please answer.
>
>
> On Mon, Jul 14, 2014 at 6:52 PM, Rahul Bhojwani <
> rahulbhojwani2...@gmail.com> wrote:
>
>> Hey, My question is for this situation:
>> Suppose we have 10 files each containing list of features in each row.
>>
>> Task is that for each file cluster the features in that file and write
>> the corresponding cluster along with it in a new file.  So we have to
>> generate 10 more files by applying clustering in each file
>> individually.
>>
>> So can I do it this way, that get rdd of list of files and apply map.
>> Inside the mapper function which will be handling each file, get another
>> spark context and use Mllib kmeans to get the clustered output file.
>>
>> Please suggest the appropriate method to tackle this problem.
>>
>> Thanks,
>> Rahul Kumar Bhojwani
>> 3rd year, B.Tech
>> Computer Science Engineering
>> National Institute Of Technology, Karnataka
>> 9945197359
>>
>
>
>
> --
> Rahul K Bhojwani
> 3rd Year B.Tech
> Computer Science and Engineering
> National Institute of Technology, Karnataka
>



-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: Can we get a spark context inside a mapper

2014-07-14 Thread Matei Zaharia
You currently can't use SparkContext inside a Spark task, so in this case you'd 
have to call some kind of local K-means library. One example you can try to use 
is Weka (http://www.cs.waikato.ac.nz/ml/weka/). You can then load your text 
files as an RDD of strings with SparkContext.wholeTextFiles and call Weka on 
each one.

Matei

On Jul 14, 2014, at 11:30 AM, Rahul Bhojwani  
wrote:

> I understand that the question is very unprofessional, but I am a newbie. If 
> you could share some link where I can ask such questions, if not here. 
> 
> But please answer.
> 
> 
> On Mon, Jul 14, 2014 at 6:52 PM, Rahul Bhojwani  
> wrote:
> Hey, My question is for this situation:
> Suppose we have 10 files each containing list of features in each row.
> 
> Task is that for each file cluster the features in that file and write the 
> corresponding cluster along with it in a new file.  So we have to generate 
> 10 more files by applying clustering in each file individually.
> 
> So can I do it this way, that get rdd of list of files and apply map. Inside 
> the mapper function which will be handling each file, get another spark 
> context and use Mllib kmeans to get the clustered output file.
> 
> Please suggest the appropriate method to tackle this problem.
> 
> Thanks, 
> Rahul Kumar Bhojwani
> 3rd year, B.Tech
> Computer Science Engineering
> National Institute Of Technology, Karnataka
> 9945197359
> 
> 
> 
> 
> -- 
> Rahul K Bhojwani
> 3rd Year B.Tech
> Computer Science and Engineering
> National Institute of Technology, Karnataka



Re: Ideal core count within a single JVM

2014-07-14 Thread Matei Zaharia
Probably something like 8 is best on this kind of machine. What operations are 
you doing though? It's possible that something else is a contention point at 48 
threads, e.g. a common one we've seen is the Linux file system.

Matei

On Jul 13, 2014, at 4:03 PM, lokesh.gidra  wrote:

> Hello,
> 
> What would be an ideal core count to run a spark job in local mode to get
> best utilization of CPU? Actually I have a 48-core machine but the
> performance of local[48] is poor as compared to local[10].
> 
> 
> Lokesh
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Ideal-core-count-within-a-single-JVM-tp9566.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Error when testing with large sparse svm

2014-07-14 Thread Srikrishna S
That is exactly the same error that I got. I am still having no success.

Regards,
Krishna

On Mon, Jul 14, 2014 at 11:50 AM, crater  wrote:
> Hi Krishna,
>
> Thanks for your help. Are you able to get your 29M data running yet? I fix
> the previous problem by setting larger spark.akka.frameSize, but now I get
> some other errors below. Did you get these errors before?
>
>
> 14/07/14 11:32:20 ERROR TaskSchedulerImpl: Lost executor 1 on node7: remote
> Akka client disassociated
> 14/07/14 11:32:20 WARN TaskSetManager: Lost TID 20 (task 13.0:0)
> 14/07/14 11:32:21 ERROR TaskSchedulerImpl: Lost executor 3 on node8: remote
> Akka client disassociated
> 14/07/14 11:32:21 WARN TaskSetManager: Lost TID 21 (task 13.0:1)
> 14/07/14 11:32:23 ERROR TaskSchedulerImpl: Lost executor 6 on node3: remote
> Akka client disassociated
> 14/07/14 11:32:23 WARN TaskSetManager: Lost TID 22 (task 13.0:0)
> 14/07/14 11:32:25 ERROR TaskSchedulerImpl: Lost executor 0 on node4: remote
> Akka client disassociated
> 14/07/14 11:32:25 WARN TaskSetManager: Lost TID 23 (task 13.0:1)
> 14/07/14 11:32:26 ERROR TaskSchedulerImpl: Lost executor 5 on node1: remote
> Akka client disassociated
> 14/07/14 11:32:26 WARN TaskSetManager: Lost TID 24 (task 13.0:0)
> 14/07/14 11:32:28 ERROR TaskSchedulerImpl: Lost executor 7 on node6: remote
> Akka client disassociated
> 14/07/14 11:32:28 WARN TaskSetManager: Lost TID 26 (task 13.0:0)
> 14/07/14 11:32:28 ERROR TaskSetManager: Task 13.0:0 failed 4 times; aborting
> job
> Exception in thread "main" org.apache.spark.SparkException: Job aborted due
> to stage failure: Task 13.0:0 failed 4 times, most recent failure: TID 26 on
> host node6 failed for unknown reason
> Driver stacktrace:
> at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1044)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1028)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1026)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1026)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634)
> at scala.Option.foreach(Option.scala:236)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:634)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1229)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> at 
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-testing-with-large-sparse-svm-tp9592p9623.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Can we get a spark context inside a mapper

2014-07-14 Thread Jerry Lam
Hi there,

I think the question is interesting; a spark of sparks = spark
I wonder if you can use the spark job server (
https://github.com/ooyala/spark-jobserver)?

So in the spark task that requires a new spark context, instead of creating
it in the task, contact the job server to create one and use the data in
the task as the data source either via hdfs/tachyon/s3. Wait until the
sub-task is done then continue. Since the job server has the notion of job
id, you might use it as a reference to the sub-task.

I don't know if this is a good idea or bad one. Maybe this is an
anti-pattern of spark, but maybe not.

HTH,

Jerry



On Mon, Jul 14, 2014 at 3:09 PM, Matei Zaharia 
wrote:

> You currently can't use SparkContext inside a Spark task, so in this case
> you'd have to call some kind of local K-means library. One example you can
> try to use is Weka (http://www.cs.waikato.ac.nz/ml/weka/). You can then
> load your text files as an RDD of strings with SparkContext.wholeTextFiles
> and call Weka on each one.
>
> Matei
>
> On Jul 14, 2014, at 11:30 AM, Rahul Bhojwani 
> wrote:
>
> I understand that the question is very unprofessional, but I am a newbie.
> If you could share some link where I can ask such questions, if not here.
>
> But please answer.
>
>
> On Mon, Jul 14, 2014 at 6:52 PM, Rahul Bhojwani <
> rahulbhojwani2...@gmail.com> wrote:
>
>> Hey, My question is for this situation:
>> Suppose we have 10 files each containing list of features in each row.
>>
>> Task is that for each file cluster the features in that file and write
>> the corresponding cluster along with it in a new file.  So we have to
>> generate 10 more files by applying clustering in each file
>> individually.
>>
>> So can I do it this way, that get rdd of list of files and apply map.
>> Inside the mapper function which will be handling each file, get another
>> spark context and use Mllib kmeans to get the clustered output file.
>>
>> Please suggest the appropriate method to tackle this problem.
>>
>> Thanks,
>> Rahul Kumar Bhojwani
>> 3rd year, B.Tech
>> Computer Science Engineering
>> National Institute Of Technology, Karnataka
>> 9945197359
>>
>
>
>
> --
> Rahul K Bhojwani
> 3rd Year B.Tech
> Computer Science and Engineering
> National Institute of Technology, Karnataka
>
>
>


Re: writing FLume data to HDFS

2014-07-14 Thread Tathagata Das
Stepping a bit back, if you just want to write flume data to HDFS, you can
use flume's HDFS sink for that.

Trying to do this using Spark Streaming and SparkFlumeEvent is
unnecessarily complex. And I guess it is tricky to write the raw bytes from
the sparkflumevent into a file. If you want to do it this way, I suggest
trying this (not tested, pure guess work).

RDD[SparkFlumeEvent] ---> map to get the RDD of payload bytes ---> do
RDD.mapPartition() to write the whole RDD's partition of bytes into a HDFS
file (using HDFS's file output stream interface)

You will have to take care of making the file names of each parititon
unique, and dealing with failures in writing, etc.

TD



On Mon, Jul 14, 2014 at 9:29 AM, Sundaram, Muthu X. <
muthu.x.sundaram@sabre.com> wrote:

> I am not sure how to write it…I tried writing to local file system using
> FileWriter and Print Writer. I tried it inside the while loop. I am able to
> get the text and able to print it but it fails when I use regular java
> classes. Shouldn’t I use regular java classes here? Can I write to only
> HDFS? Should I have to create the file in HDFS using HDFS classes? I
> thought of using Spark’s SaveAsTextFile(). But I have JavaRDD
> of this..not JavaRDD. So I am not sure whether SaveAsText() will
> work. I appreciate any guidance here. How do I get more code examples?
> Books, URL?
>
>
>
>
>
>   flumeStream.foreach(new Function,Void> () {
>
>   @Override
>
>   public Void call(JavaRDD eventsData) throws
> Exception {
>
>  String logRecord = null;
>
>  List events = eventsData.collect();
>
>  Iterator batchedEvents =
> events.iterator();
>
>  long t1 = System.currentTimeMillis();
>
>  AvroFlumeEvent avroEvent = null;
>
>  ByteBuffer bytePayload =
> null;
>
>  // All the user level data is carried as payload in
> Flume Event
>
>  while(batchedEvents.hasNext()) {
>
> SparkFlumeEvent flumeEvent =
> batchedEvents.next();
>
> avroEvent = flumeEvent.event();
>
> bytePayload = avroEvent.getBody();
>
> logRecord = new
> String(bytePayload.array());
>
>
>
> System.out.println("LOG RECORD = " +
> logRecord);
>
>
>
>??I was trying to write the data to hdfs..but
> it fails…
>
>
>
>
>
> *From:* Tathagata Das [mailto:tathagata.das1...@gmail.com]
> *Sent:* Friday, July 11, 2014 1:43 PM
> *To:* user@spark.apache.org
> *Cc:* u...@spark.incubator.apache.org
> *Subject:* Re: writing FLume data to HDFS
>
>
>
> What is the error you are getting when you say "??I was trying to write
> the data to hdfs..but it fails…"
>
>
>
> TD
>
>
>
> On Thu, Jul 10, 2014 at 1:36 PM, Sundaram, Muthu X. <
> muthu.x.sundaram@sabre.com> wrote:
>
> I am new to spark. I am trying to do the following.
>
> NetcatàFlumeàSpark streaming(process Flume Data)àHDFS.
>
>
>
> My flume config file has following set up.
>
>
>
> Source = netcat
>
> Sink=avrosink.
>
>
>
> Spark Streaming code:
>
> I am able to print data from flume to the monitor. But I am struggling to
> create a file. In order to get the real data I need to convert SparkEvent
> to avroEvent.
>
> JavaRDD.saveAsText()àmight not work..because JavaRDD is collection of
> SparkEvent..Do I need to convert this in to collection of
> JavaRDD?
>
> Please share any code examples… Thanks.
>
>
>
> Code:
>
>
>
>  Duration batchInterval = new Duration(2000);
>
> SparkConf sparkConf = new
> SparkConf().setAppName("JavaFlumeEventCount");
>
> JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
> batchInterval);
>
> JavaDStream flumeStream =
> FlumeUtils.createStream(ssc, host, port);
>
>
>
> flumeStream.count();
>
> flumeStream.foreachRDD(new
> Function2,JavaRDD,Void>(){
>
>  @Override
>
>  public Void call(JavaRDD
> events1,JavaRDD events2) throws Exception{
>
> events1.saveasTextFile("output.txt");
>
> return null;
>
>  }
>
>  });
>
>
>
> /*flumeStream.count().map(new Function() {
>
>   @Override
>
>   public String call(Long in) {
>
> return "Received " + in + " flume events.";
>
>   }
>
> }).print();*/
>
>
>
> flumeStream.foreach(new Function,Void> () {
>
>   @Override
>
>   public Void call(JavaRDD eventsData) throws
> Exception {
>
>  String logRecord = null;
>
>  List events = eventsData.collect();
>
>  Iterator batchedEvents =
> events.iterator();
>
>
>
>
>
>  long t1 = System.currentTimeMillis();
>
>  AvroFlumeEvent avroEvent = null;
>
>  ByteBuffer bytePayload = null;
>
>
>
>
>  // All the user level data is carried as payload in
> Flume Event
>
>
>
>  

Re: Ideal core count within a single JVM

2014-07-14 Thread lokesh.gidra
Thanks a lot for replying back.

Actually, I am running the SparkPageRank example with 160GB heap (I am sure
the problem is not GC because the excess time is being spent in java code
only).

What I have observed in Jprofiler and Oprofile outputs is that the amount of
time spent in following 2 functions increases substantially with increasing
N:

1) java.io.ObjectOutputStream.writeObject0
2) scala.Tuple2.hashCode 

I don't think that Linux file system could be causing the issue as my
machine has 256GB RAM, and I am using a tmpfs for java.io.tmpdir. So, I
don't think there is much disk access involved, if that is what you meant.

Regards,
Lokesh



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Ideal-core-count-within-a-single-JVM-tp9566p9630.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark Streaming Json file groupby function

2014-07-14 Thread Tathagata Das
You have to import StreamingContext._  to enable groupByKey operations on
DStreams. After importing that you can apply groupByKey on any DStream,
that is a DStream of key-value pairs (e.g. DStream[(String, Int)]) . The
data in each pair RDDs will be grouped by the first element in the tuple as
the grouping element.

TD


On Mon, Jul 14, 2014 at 10:59 AM, srinivas  wrote:

> hi
>   I am new to spark and scala and I am trying to do some aggregations on
> json file stream using Spark Streaming. I am able to parse the json string
> and it is converted to map(id -> 123, name -> srini, mobile -> 12324214,
> score -> 123, test_type -> math) now i want to use GROUPBY function on each
> student map data and wanted to do some aggregations on scores. Here is my
> main function
> val Array(zkQuorum, group, topics, numThreads) = args
> val sparkConf = new SparkConf().setAppName("KafkaWordCount")
> val ssc = new StreamingContext(sparkConf, Seconds(10))
>// ssc.checkpoint("checkpoint")
>
> val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
>  val lines = KafkaUtils.createStream(ssc, zkQuorum, group,
> topicpMap).map(_._2)
>  val jsonf =
>
> lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[String,
> Any]])
>
>
> jsonf.print()
>
> ssc.start()
> ssc.awaitTermination()
>   }
>
> Can anyone please Let me know how to use groupby function..thanks
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: Number of executors change during job running

2014-07-14 Thread Tathagata Das
After using repartition(300), how many executors did it run on? By the way,
repartitions(300) means it will divide the shuffled data into 300
partitions. Since there are many cores on each of the 300
machines/executors, these partitions (each requiring a core) may not be
spread all 300 executors. Hence, if you really want spread it all 300
executors, you may have to bump up the partitions even more. However,
increasing the partitions to too high may not be beneficial, and you will
have play around with the number to figure out sweet spot that reduces the
time to process the stage / time to process the whole batch.

TD


On Fri, Jul 11, 2014 at 8:32 PM, Bill Jay 
wrote:

> Hi Tathagata,
>
> Do you mean that the data is not shuffled until the reduce stage? That
> means groupBy still only uses 2 machines?
>
> I think I used repartition(300) after I read the data from Kafka into
> DStream. It seems that it did not guarantee that the map or reduce stages
> will be run on 300 machines. I am currently trying to initiate 100 DStream
> from KafkaUtils.createDStream and union them. Now the reduce stages had
> around 80 machines for all the batches. However, this method will introduce
> many dstreams. It will be good if we can control the number of executors in
> the groupBy operation because the calculation needs to be finished within 1
> minute for different size of input data based on our production need.
>
> Thanks!
>
>
> Bill
>
>
> On Fri, Jul 11, 2014 at 7:29 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> Aah, I get it now. That is because the input data streams is replicated
>> on two machines, so by locality the data is processed on those two
>> machines. So the "map" stage on the data uses 2 executors, but the "reduce"
>> stage, (after groupByKey) the saveAsTextFiles would use 300 tasks. And the
>> default parallelism takes into affect only when the data is explicitly
>> shuffled around.
>>
>> You can fix this by explicitly repartitioning the data.
>>
>> inputDStream.repartition(partitions)
>>
>> This is covered in the streaming tuning guide
>> 
>> .
>>
>> TD
>>
>>
>>
>> On Fri, Jul 11, 2014 at 4:11 PM, Bill Jay 
>> wrote:
>>
>>> Hi folks,
>>>
>>> I just ran another job that only received data from Kafka, did some
>>> filtering, and then save as text files in HDFS. There was no reducing work
>>> involved. Surprisingly, the number of executors for the saveAsTextFiles
>>> stage was also 2 although I specified 300 executors in the job submission.
>>> As a result, the simple save file action took more than 2 minutes. Do you
>>> have any idea how Spark determined the number of executors
>>> for different stages?
>>>
>>> Thanks!
>>>
>>> Bill
>>>
>>>
>>> On Fri, Jul 11, 2014 at 2:01 PM, Bill Jay 
>>> wrote:
>>>
 Hi Tathagata,

 Below is my main function. I omit some filtering and data conversion
 functions. These functions are just a one-to-one mapping, which may not
 possible increase running time. The only reduce function I have here is
 groupByKey. There are 4 topics in my Kafka brokers and two of the topics
 have 240k lines each minute. And the other two topics have less than 30k
 lines per minute. The batch size is one minute and I specified 300
 executors in my spark-submit script. The default parallelism is 300.


 val parition = 300
 val zkQuorum = "zk1,zk2,zk3"
 val group = "my-group-" + currentTime.toString
 val topics = "topic1,topic2,topic3,topic4"
 val numThreads = 4
 val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
 ssc = new StreamingContext(conf, Seconds(batch))
 ssc.checkpoint(hadoopOutput + "checkpoint")
 val lines = lines1
 lines.cache()
 val jsonData = lines.map(JSON.parseFull(_))
 val mapData = jsonData.filter(_.isDefined)

 .map(_.get.asInstanceOf[scala.collection.immutable.Map[String, Any]])
 val validMapData = mapData.filter(isValidData(_))
 val fields = validMapData.map(data => (data("id").toString,
 timestampToUTCUnix(data("time").toString),

  timestampToUTCUnix(data("local_time").toString), data("id2").toString,
data("id3").toString,
 data("log_type").toString, data("sub_log_type").toString))
 val timeDiff = 3600L
 val filteredFields = fields.filter(field => abs(field._2 -
 field._3) <= timeDiff)

 val watchTimeFields = filteredFields.map(fields => (fields._1,
 fields._2, fields._4, fields._5, fields._7))
 val watchTimeTuples = watchTimeFields.map(fields =>
 getWatchtimeTuple(fields))
 val programDuids = watchTimeTuples.map(fields => (fields._3,
 fields._1)).groupByKey(partition)
 val programDuidNum = programDuids.map{case(key, value) => (key,
 value.toSe

Re: All of the tasks have been completed but the Stage is still shown as "Active"?

2014-07-14 Thread Tathagata Das
Seems like it is related. Possibly those PRs that Andrew mentioned are
going to fix this issue.


On Fri, Jul 11, 2014 at 5:51 AM, Haopu Wang  wrote:

>   I saw some exceptions like this in driver log. Can you shed some
> lights? Is it related with the behaviour?
>
>
>
> 14/07/11 20:40:09 ERROR LiveListenerBus: Listener JobProgressListener
> threw an exception
>
> java.util.NoSuchElementException: key not found: 64019
>
>  at scala.collection.MapLike$class.default(MapLike.scala:228)
>
>  at scala.collection.AbstractMap.default(Map.scala:58)
>
>  at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
>
>  at
> org.apache.spark.ui.jobs.JobProgressListener.onStageCompleted(JobProgressListener.scala:78)
>
>  at
> org.apache.spark.scheduler.SparkListenerBus$$anonfun$postToAll$2.apply(SparkListenerBus.scala:48)
>
>  at
> org.apache.spark.scheduler.SparkListenerBus$$anonfun$postToAll$2.apply(SparkListenerBus.scala:48)
>
>  at
> org.apache.spark.scheduler.SparkListenerBus$$anonfun$foreachListener$1.apply(SparkListenerBus.scala:81)
>
>  at
> org.apache.spark.scheduler.SparkListenerBus$$anonfun$foreachListener$1.apply(SparkListenerBus.scala:79)
>
>  at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>
>  at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>
>  at
> org.apache.spark.scheduler.SparkListenerBus$class.foreachListener(SparkListenerBus.scala:79)
>
>  at
> org.apache.spark.scheduler.SparkListenerBus$class.postToAll(SparkListenerBus.scala:48)
>
>  at
> org.apache.spark.scheduler.LiveListenerBus.postToAll(LiveListenerBus.scala:32)
>
>  at
> org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:56)
>
>  at
> org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:56)
>
>  at scala.Option.foreach(Option.scala:236)
>
>  at
> org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(LiveListenerBus.scala:56)
>
>  at
> org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply(LiveListenerBus.scala:47)
>
>  at
> org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply(LiveListenerBus.scala:47)
>
>  at
> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160)
>
>  at
> org.apache.spark.scheduler.LiveListenerBus$$anon$1.run(LiveListenerBus.scala:46)
>
>
>  --
>
> *From:* Haopu Wang
> *Sent:* Thursday, July 10, 2014 7:38 PM
> *To:* user@spark.apache.org
> *Subject:* RE: All of the tasks have been completed but the Stage is
> still shown as "Active"?
>
>
>
> I didn't keep the driver's log. It's a lesson.
>
> I will try to run it again to see if it happens again.
>
>
>  --
>
> *From:* Tathagata Das [mailto:tathagata.das1...@gmail.com]
> *Sent:* 2014年7月10日 17:29
> *To:* user@spark.apache.org
> *Subject:* Re: All of the tasks have been completed but the Stage is
> still shown as "Active"?
>
>
>
> Do you see any errors in the logs of the driver?
>
>
>
> On Thu, Jul 10, 2014 at 1:21 AM, Haopu Wang  wrote:
>
> I'm running an App for hours in a standalone cluster. From the data
> injector and "Streaming" tab of web ui, it's running well.
>
> However, I see quite a lot of Active stages in web ui even some of them
> have all of their tasks completed.
>
> I attach a screenshot for your reference.
>
> Do you ever see this kind of behavior?
>
>
>


Re: Spark streaming - tasks and stages continue to be generated when using reduce by key

2014-07-14 Thread Tathagata Das
The depends on your requirements. If you want to process the 250 GB input
file as a "stream" to emulate the stream of data, then it should be split
into files (such that event ordering is maintained in those splits, if
necessary). And then those splits should be moved one-by-one in the
directory monitored by the streaming app. You will need to figure out the
split size, etc, depending on what is your intended batch size (in terms of
seconds) in the streaming app.
And it doesnt really need to be a multiple of hdfs block sizes.

TD



On Sat, Jul 12, 2014 at 7:31 AM, M Singh  wrote:

> Thanks TD.
>
> BTW - If I have input file ~ 250 GBs - Is there any guideline on whether
> to use:
>
>- a single input (250 GB) (in this case is there any max upper bound)
>or
>- split into 1000 files each of 250 MB (hdfs block size is 250 MB) or
>- a multiple of hdfs block size.
>
> Mans
>
>
>
>   On Friday, July 11, 2014 4:38 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>
> The model for file stream is to pick up and process new files written
> atomically (by move) into a directory. So your file is being processed in a
> single batch, and then its waiting for any new files to be written into
> that directory.
>
> TD
>
>
> On Fri, Jul 11, 2014 at 11:46 AM, M Singh  wrote:
>
> So, is it expected for the process to generate stages/tasks even after
> processing a file ?
>
> Also, is there a way to figure out the file that is getting processed and
> when that process is complete ?
>
> Thanks
>
>
>   On Friday, July 11, 2014 1:51 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>
> Whenever you need to do a shuffle=based operation like reduceByKey,
> groupByKey, join, etc., the system is essentially redistributing the data
> across the cluster and it needs to know how many parts should it divide the
> data into. Thats where the default parallelism is used.
>
> TD
>
>
> On Fri, Jul 11, 2014 at 3:16 AM, M Singh  wrote:
>
> Hi TD:
>
> The input file is on hdfs.
>
>  The file is approx 2.7 GB and when the process starts, there are 11
> tasks (since hdfs block size is 256M) for processing and 2 tasks for reduce
> by key.  After the file has been processed, I see new stages with 2 tasks
> that continue to be generated. I understand this value (2) is the default
> value for spark.default.parallelism but don't quite understand how is the
> value determined for generating tasks for reduceByKey, how is it used
> besides reduceByKey and what should be the optimal value for this.
>
>  Thanks.
>
>
>   On Thursday, July 10, 2014 7:24 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>
> How are you supplying the text file?
>
>
> On Wed, Jul 9, 2014 at 11:51 AM, M Singh  wrote:
>
> Hi Folks:
>
> I am working on an application which uses spark streaming (version 1.1.0
> snapshot on a standalone cluster) to process text file and save counters in
> cassandra based on fields in each row.  I am testing the application in two
> modes:
>
>- Process each row and save the counter in cassandra.  In this
>scenario after the text file has been consumed, there is no task/stages
>seen in the spark UI.
>- If instead I use reduce by key before saving to cassandra, the spark
>UI shows continuous generation of tasks/stages even after processing the
>file has been completed.
>
> I believe this is because the reduce by key requires merging of data from
> different partitions.  But I was wondering if anyone has any
> insights/pointers for understanding this difference in behavior and how to
> avoid generating tasks/stages when there is no data (new file) available.
>
> Thanks
>
> Mans
>
>
>
>
>
>
>
>
>
>
>


Re: Ideal core count within a single JVM

2014-07-14 Thread Matei Zaharia
Are you increasing the number of parallel tasks with cores as well? With more 
tasks there will be more data communicated and hence more calls to these 
functions.

Unfortunately contention is kind of hard to measure, since often the result is 
that you see many cores idle as they're waiting on a lock. ObjectOutputStream 
should not lock anything, but if it's blocking on a FileOutputStream to write 
data, that could be a problem. Look for "BLOCKED" threads in a stack trace too 
(do jstack on your Java process and look at the TaskRunner threads).

Incidentally you can probably speed this up by using Kryo serialization instead 
of Java (see http://spark.apache.org/docs/latest/tuning.html). That might make 
it less CPU-bound and it would also create less IO.

Matei

On Jul 14, 2014, at 12:23 PM, lokesh.gidra  wrote:

> Thanks a lot for replying back.
> 
> Actually, I am running the SparkPageRank example with 160GB heap (I am sure
> the problem is not GC because the excess time is being spent in java code
> only).
> 
> What I have observed in Jprofiler and Oprofile outputs is that the amount of
> time spent in following 2 functions increases substantially with increasing
> N:
> 
> 1) java.io.ObjectOutputStream.writeObject0
> 2) scala.Tuple2.hashCode 
> 
> I don't think that Linux file system could be causing the issue as my
> machine has 256GB RAM, and I am using a tmpfs for java.io.tmpdir. So, I
> don't think there is much disk access involved, if that is what you meant.
> 
> Regards,
> Lokesh
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Ideal-core-count-within-a-single-JVM-tp9566p9630.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Streaming. Cannot get socketTextStream to receive anything.

2014-07-14 Thread Tathagata Das
When you are sending data using simple socket code to send messages, are
those messages "\n" delimited? If its not, then the receiver of
socketTextSTream, wont identify them as separate events, and keep buffering
them.

TD


On Sun, Jul 13, 2014 at 10:49 PM, kytay  wrote:

> Hi Tobias
>
> I have been using "local[4]" to test.
> My problem is likely caused by the tcp host server that I am trying the
> emulate. I was trying to emulate the tcp host to send out messages.
> (although I am not sure at the moment :D)
>
> First way I tried was to use a tcp tool called, Hercules.
>
> Second way was to write a simple socket code to send message at interval.
> Like the one shown in #2 of my first post. I suspect the reason why it
> don't
> work is due the messages are not "flush" so no message was received on
> Spark
> Streaming.
>
> I think I will need to do more testing to understand the behavior. I am
> currently not sure why "nc -lk" is working, and not the other tools or
> codes
> I am testing with.
>
> Regards.
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Solved-Streaming-Cannot-get-socketTextStream-to-receive-anything-tp9382p9588.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: Error in JavaKafkaWordCount.java example

2014-07-14 Thread Tathagata Das
Are you compiling it within Spark using Spark's recommended way (see doc
web page)? Or are you compiling it in your own project? In the latter case,
make sure you are using the Scala 2.10.4.

TD


On Sun, Jul 13, 2014 at 6:43 AM, Mahebub Sayyed 
wrote:

> Hello,
>
> I am referring following example:
>
> https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
>
> I am getting following C*ompilation Error* :
> \example\JavaKafkaWordCount.java:[62,70] error: cannot access ClassTag
>
> Please help me.
> Thanks in advance.
>
> --
> *Regards,*
> *Mahebub Sayyed*
>


Re: Spark SQL 1.0.1 error on reading fixed length byte array

2014-07-14 Thread Michael Armbrust
This is not supported yet, but there is a PR open to fix it:
https://issues.apache.org/jira/browse/SPARK-2446


On Mon, Jul 14, 2014 at 4:17 AM, Pei-Lun Lee  wrote:

> Hi,
>
> I am using spark-sql 1.0.1 to load parquet files generated from method
> described in:
>
> https://gist.github.com/massie/7224868
>
>
> When I try to submit a select query with columns of type fixed length byte
> array, the following error pops up:
>
>
> 14/07/14 11:09:14 INFO scheduler.DAGScheduler: Failed to run take at
> basicOperators.scala:100
> org.apache.spark.SparkDriverExecutionException: Execution error
> at
> org.apache.spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.scala:581)
> at
> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:559)
> Caused by: parquet.io.ParquetDecodingException: Can not read value at 0 in
> block -1 in file s3n://foo/bar/part-r-0.snappy.parquet
> at
> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:177)
> at
> parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:130)
> at
> org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:122)
> at
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
> at scala.collection.TraversableOnce$class.to
> (TraversableOnce.scala:273)
> at scala.collection.AbstractIterator.to(Iterator.scala:1157)
> at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
> at
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
> at org.apache.spark.rdd.RDD$$anonfun$27.apply(RDD.scala:989)
> at org.apache.spark.rdd.RDD$$anonfun$27.apply(RDD.scala:989)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083)
> at
> org.apache.spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.scala:574)
> ... 1 more
> Caused by: java.lang.ClassCastException: Expected instance of primitive
> converter but got
> "org.apache.spark.sql.parquet.CatalystNativeArrayConverter"
> at parquet.io.api.Converter.asPrimitiveConverter(Converter.java:30)
> at
> parquet.io.RecordReaderImplementation.(RecordReaderImplementation.java:264)
> at
> parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:60)
> at
> parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:74)
> at
> parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:110)
> at
> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:172)
> ... 24 more
>
>
> Is fixed length byte array supposed to work in this version? I noticed
> that other array types like int or string already work.
>
> Thanks,
> --
> Pei-Lun
>
>


Re: can't print DStream after reduce

2014-07-14 Thread Tathagata Das
The problem is not really for local[1] or local. The problem arises when
there are more input streams than there are cores.
But I agree, for people who are just beginning to use it by running it
locally, there should be a check addressing this.

I made a JIRA for this.
https://issues.apache.org/jira/browse/SPARK-2464

TD


On Sun, Jul 13, 2014 at 4:26 PM, Sean Owen  wrote:

> How about a PR that rejects a context configured for local or local[1]? As
> I understand it is not intended to work and has bitten several people.
> On Jul 14, 2014 12:24 AM, "Michael Campbell" 
> wrote:
>
>> This almost had me not using Spark; I couldn't get any output.  It is not
>> at all obvious what's going on here to the layman (and to the best of my
>> knowledge, not documented anywhere), but now you know you'll be able to
>> answer this question for the numerous people that will also have it.
>>
>>
>> On Sun, Jul 13, 2014 at 5:24 PM, Walrus theCat 
>> wrote:
>>
>>> Great success!
>>>
>>> I was able to get output to the driver console by changing the
>>> construction of the Streaming Spark Context from:
>>>
>>>  val ssc = new StreamingContext("local" /**TODO change once a cluster is
>>> up **/,
>>> "AppName", Seconds(1))
>>>
>>>
>>> to:
>>>
>>> val ssc = new StreamingContext("local[2]" /**TODO change once a cluster
>>> is up **/,
>>> "AppName", Seconds(1))
>>>
>>>
>>> I found something that tipped me off that this might work by digging
>>> through this mailing list.
>>>
>>>
>>> On Sun, Jul 13, 2014 at 2:00 PM, Walrus theCat 
>>> wrote:
>>>
 More strange behavior:

 lines.foreachRDD(x => println(x.first)) // works
 lines.foreachRDD(x => println((x.count,x.first))) // no output is
 printed to driver console




 On Sun, Jul 13, 2014 at 11:47 AM, Walrus theCat >>> > wrote:

>
> Thanks for your interest.
>
> lines.foreachRDD(x => println(x.count))
>
>  And I got 0 every once in a while (which I think is strange, because
> lines.print prints the input I'm giving it over the socket.)
>
>
> When I tried:
>
> lines.map(_->1).reduceByKey(_+_).foreachRDD(x => println(x.count))
>
> I got no count.
>
> Thanks
>
>
> On Sun, Jul 13, 2014 at 11:34 AM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> Try doing DStream.foreachRDD and then printing the RDD count and
>> further inspecting the RDD.
>>  On Jul 13, 2014 1:03 AM, "Walrus theCat" 
>> wrote:
>>
>>> Hi,
>>>
>>> I have a DStream that works just fine when I say:
>>>
>>> dstream.print
>>>
>>> If I say:
>>>
>>> dstream.map(_,1).print
>>>
>>> that works, too.  However, if I do the following:
>>>
>>> dstream.reduce{case(x,y) => x}.print
>>>
>>> I don't get anything on my console.  What's going on?
>>>
>>> Thanks
>>>
>>
>

>>>
>>


Re: Ideal core count within a single JVM

2014-07-14 Thread lokesh.gidra
I am only playing with 'N' in local[N]. I thought that by increasing N, Spark
will automatically use more parallel tasks. Isn't it so? Can you please tell
me how can I modify the number of parallel tasks?

For me, there are hardly any threads in BLOCKED state in jstack output. In
'top' I see my application consuming all the 48 cores all the time with
N=48.

I am attaching two jstack outputs that I took will the application was
running.


Lokesh

lessoutput3.lessoutput3

  
lessoutput4.lessoutput4

  




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Ideal-core-count-within-a-single-JVM-tp9566p9640.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Catalyst dependency on Spark Core

2014-07-14 Thread Michael Armbrust
Yeah, sadly this dependency was introduced when someone consolidated the
logging infrastructure.  However, the dependency should be very small and
thus easy to remove, and I would like catalyst to be usable outside of
Spark.  A pull request to make this possible would be welcome.

Ideally, we'd create some sort of spark common package that has things like
logging.  That way catalyst could depend on that, without pulling in all of
Hadoop, etc.  Maybe others have opinions though, so I'm cc-ing the dev list.


On Mon, Jul 14, 2014 at 12:21 AM, Yanbo Liang  wrote:

> Make Catalyst independent of Spark is the goal of Catalyst, maybe need
> time and evolution.
> I awared that package org.apache.spark.sql.catalyst.util
> embraced org.apache.spark.util.{Utils => SparkUtils},
> so that Catalyst has a dependency on Spark core.
> I'm not sure whether it will be replaced by other component independent of
> Spark in later release.
>
>
> 2014-07-14 11:51 GMT+08:00 Aniket Bhatnagar :
>
> As per the recent presentation given in Scala days (
>> http://people.apache.org/~marmbrus/talks/SparkSQLScalaDays2014.pdf), it
>> was mentioned that Catalyst is independent of Spark. But on inspecting
>> pom.xml of sql/catalyst module, it seems it has a dependency on Spark Core.
>> Any particular reason for the dependency? I would love to use Catalyst
>> outside Spark
>>
>> (reposted as previous email bounced. Sorry if this is a duplicate).
>>
>
>


Re: Nested Query With Spark SQL(1.0.1)

2014-07-14 Thread Michael Armbrust
What sort of nested query are you talking about?  Right now we only support
nested queries in the FROM clause.  I'd like to add support for other cases
in the future.


On Sun, Jul 13, 2014 at 4:11 AM, anyweil  wrote:

> Or is it supported? I know I could doing it myself with filter, but if SQL
> could support, would be much better, thx!
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Nested-Query-With-Spark-SQL-1-0-1-tp9544p9547.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Memory & compute-intensive tasks

2014-07-14 Thread Ravi Pandya
I'm trying to run a job that includes an invocation of a memory &
compute-intensive multithreaded C++ program, and so I'd like to run one
task per physical node. Using rdd.coalesce(# nodes) seems to just allocate
one task per core, and so runs out of memory on the node. Is there any way
to give the scheduler a hint that the task uses lots of memory and cores so
it spreads it out more evenly?

Thanks,

Ravi Pandya
Microsoft Research


Spark 1.0.1 EC2 - Launching Applications

2014-07-14 Thread Josh Happoldt
Hi All,

I've used the spark-ec2 scripts to build a simple 1.0.1 Standalone cluster on 
EC2.  It appears that the spark-submit script is not bundled with a spark-ec2 
install.  Given that:  What is the recommended way to execute spark jobs on a 
standalone EC2 cluster?  Spark-submit provides extremely useful features that 
are still useful for EC2 deployments.

We've used workarounds like modifying the spark-classpath and using run-example 
in the past to run simple one-time EC2 jobs.  The 'Running Applications' 
section of the EC2-Scripts documentation does not mention how to actually 
submit jobs to the cluster either.

Thanks!

Josh





Re: Repeated data item search with Spark SQL(1.0.1)

2014-07-14 Thread Michael Armbrust
Handling of complex types is somewhat limited in SQL at the moment.  It'll
be more complete if you use HiveQL.

That said, the problem here is you are calling .name on an array.  You need
to pick an item from the array (using [..]) or use something like a lateral
view explode.


On Sat, Jul 12, 2014 at 11:16 PM, anyweil  wrote:

> Hi All:
>
> I am using Spark SQL 1.0.1 for a simple test, the loaded data (JSON format)
> which is registered as table "people" is:
>
> {"name":"Michael",
> "schools":[{"name":"ABC","time":1994},{"name":"EFG","time":2000}]}
> {"name":"Andy", "age":30,"scores":{"eng":98,"phy":89}}
> {"name":"Justin", "age":19}
>
> the schools has repeated value {"name":"XXX","time":X}, how should I write
> the SQL to select the people who has schools with name "ABC"? I have tried
> "SELECT name FROM people WHERE schools.name = 'ABC' ",but seems wrong
> with:
>
> [error] (run-main-0)
> org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved
> attributes: 'name, tree:
> [error] Project ['name]
> [error]  Filter ('schools.name = ABC)
> [error]   Subquery people
> [error]ParquetRelation people.parquet, Some(Configuration:
> core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml)
> org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved
> attributes: 'name, tree:
>
> Project ['name]
>  Filter ('schools.name = ABC)
>   Subquery people
>ParquetRelation people.parquet, Some(Configuration: core-default.xml,
> core-site.xml, mapred-default.xml, mapred-site.xml)
>
> at
>
> org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$apply$1.applyOrElse(Analyzer.scala:71)
> ...
>
> Could anybody show me how to write a right SQL for the repeated data item
> search in Spark SQL? Thank you!
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Repeated-data-item-search-with-Spark-SQL-1-0-1-tp9544.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Client application that calls Spark and receives an MLlib *model* Scala Object, not just result

2014-07-14 Thread Aris Vlasakakis
Hello Spark community,

I would like to write an application in Scala that i a model server. It
should have an MLlib Linear Regression model that is already trained on
some big set of data, and then is able to repeatedly call
myLinearRegressionModel.predict() many times and return the result.

Now, I want this client application to submit a job to Spark and tell the
Spark cluster job to

1) train its particular MLlib model, which produces a LinearRegression
model, and then

2) take the produced Scala
org.apache.spark.mllib.regression.LinearRegressionModel *object*, serialize
that object, and return this serialized object over the wire to my calling
application.

3) My client application receives the serialized Scala (model) object, and
can call .predict() on it over and over.

I am separating the heavy lifting of training the model and doing model
predictions; the client application will only do predictions using the
MLlib model it received from the Spark application.

The confusion I have is that I only know how to "submit jobs to Spark" by
using the bin/spark-submit script, and then the only output I receive is
stdout (as in, text). I want my scala appliction to hopefully submit the
spark model-training programmatically, and for the Spark application to
return a SERIALIZED MLLIB OBJECT, not just some stdout text!

How can I do this? I think my use case of separating long-running jobs to
Spark and using it's libraries in another application should be a pretty
common design pattern.

Thanks!

-- 
Άρης Βλασακάκης
Aris Vlasakakis


Re: Client application that calls Spark and receives an MLlib *model* Scala Object, not just result

2014-07-14 Thread Soumya Simanta
Please look at the following.

https://github.com/ooyala/spark-jobserver
http://en.wikipedia.org/wiki/Predictive_Model_Markup_Language
https://github.com/EsotericSoftware/kryo

You can train your model convert it to PMML and return that to your client
OR

You can train your model and write that model (serialized object) to the
file system (local, HDFS, S3 etc) or a datastore and return a location back
to the client on a successful write.





On Mon, Jul 14, 2014 at 4:27 PM, Aris Vlasakakis 
wrote:

> Hello Spark community,
>
> I would like to write an application in Scala that i a model server. It
> should have an MLlib Linear Regression model that is already trained on
> some big set of data, and then is able to repeatedly call
> myLinearRegressionModel.predict() many times and return the result.
>
> Now, I want this client application to submit a job to Spark and tell the
> Spark cluster job to
>
> 1) train its particular MLlib model, which produces a LinearRegression
> model, and then
>
> 2) take the produced Scala
> org.apache.spark.mllib.regression.LinearRegressionModel *object*, serialize
> that object, and return this serialized object over the wire to my calling
> application.
>
> 3) My client application receives the serialized Scala (model) object, and
> can call .predict() on it over and over.
>
> I am separating the heavy lifting of training the model and doing model
> predictions; the client application will only do predictions using the
> MLlib model it received from the Spark application.
>
> The confusion I have is that I only know how to "submit jobs to Spark" by
> using the bin/spark-submit script, and then the only output I receive is
> stdout (as in, text). I want my scala appliction to hopefully submit the
> spark model-training programmatically, and for the Spark application to
> return a SERIALIZED MLLIB OBJECT, not just some stdout text!
>
> How can I do this? I think my use case of separating long-running jobs to
> Spark and using it's libraries in another application should be a pretty
> common design pattern.
>
> Thanks!
>
> --
> Άρης Βλασακάκης
> Aris Vlasakakis
>


Re: Memory & compute-intensive tasks

2014-07-14 Thread Daniel Siegmann
I don't have a solution for you (sorry), but do note that
rdd.coalesce(numNodes) keeps data on the same nodes where it was. If you
set shuffle=true then it should repartition and redistribute the data. But
it uses the hash partitioner according to the ScalaDoc - I don't know of
any way to supply a custom partitioner.


On Mon, Jul 14, 2014 at 4:09 PM, Ravi Pandya  wrote:

> I'm trying to run a job that includes an invocation of a memory &
> compute-intensive multithreaded C++ program, and so I'd like to run one
> task per physical node. Using rdd.coalesce(# nodes) seems to just allocate
> one task per core, and so runs out of memory on the node. Is there any way
> to give the scheduler a hint that the task uses lots of memory and cores so
> it spreads it out more evenly?
>
> Thanks,
>
> Ravi Pandya
> Microsoft Research
>



-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


How to kill running spark yarn application

2014-07-14 Thread hsy...@gmail.com
Hi all,

A newbie question, I start a spark yarn application through spark-submit

How do I kill this app. I can kill the yarn app by "yarn application -kill
appid" but the application master is still running. What's the proper way
to shutdown the entire app?

Best,
Siyuan


Re: Number of executors change during job running

2014-07-14 Thread Bill Jay
Hi Tathagata,

It seems repartition does not necessarily force Spark to distribute the
data into different executors. I have launched a new job which uses
repartition right after I received data from Kafka. For the first two
batches, the reduce stage used more than 80 executors. Starting from the
third batch, there were always only 2 executors in the reduce task
(combineByKey). Even with the first batch which used more than 80
executors, it took 2.4 mins to finish the reduce stage for a very small
amount of data.

Bill


On Mon, Jul 14, 2014 at 12:30 PM, Tathagata Das  wrote:

> After using repartition(300), how many executors did it run on? By the
> way, repartitions(300) means it will divide the shuffled data into 300
> partitions. Since there are many cores on each of the 300
> machines/executors, these partitions (each requiring a core) may not be
> spread all 300 executors. Hence, if you really want spread it all 300
> executors, you may have to bump up the partitions even more. However,
> increasing the partitions to too high may not be beneficial, and you will
> have play around with the number to figure out sweet spot that reduces the
> time to process the stage / time to process the whole batch.
>
> TD
>
>
> On Fri, Jul 11, 2014 at 8:32 PM, Bill Jay 
> wrote:
>
>> Hi Tathagata,
>>
>> Do you mean that the data is not shuffled until the reduce stage? That
>> means groupBy still only uses 2 machines?
>>
>> I think I used repartition(300) after I read the data from Kafka into
>> DStream. It seems that it did not guarantee that the map or reduce stages
>> will be run on 300 machines. I am currently trying to initiate 100 DStream
>> from KafkaUtils.createDStream and union them. Now the reduce stages had
>> around 80 machines for all the batches. However, this method will introduce
>> many dstreams. It will be good if we can control the number of executors in
>> the groupBy operation because the calculation needs to be finished within 1
>> minute for different size of input data based on our production need.
>>
>> Thanks!
>>
>>
>> Bill
>>
>>
>> On Fri, Jul 11, 2014 at 7:29 PM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> Aah, I get it now. That is because the input data streams is replicated
>>> on two machines, so by locality the data is processed on those two
>>> machines. So the "map" stage on the data uses 2 executors, but the "reduce"
>>> stage, (after groupByKey) the saveAsTextFiles would use 300 tasks. And the
>>> default parallelism takes into affect only when the data is explicitly
>>> shuffled around.
>>>
>>> You can fix this by explicitly repartitioning the data.
>>>
>>> inputDStream.repartition(partitions)
>>>
>>> This is covered in the streaming tuning guide
>>> 
>>> .
>>>
>>> TD
>>>
>>>
>>>
>>> On Fri, Jul 11, 2014 at 4:11 PM, Bill Jay 
>>> wrote:
>>>
 Hi folks,

 I just ran another job that only received data from Kafka, did some
 filtering, and then save as text files in HDFS. There was no reducing work
 involved. Surprisingly, the number of executors for the saveAsTextFiles
 stage was also 2 although I specified 300 executors in the job submission.
 As a result, the simple save file action took more than 2 minutes. Do you
 have any idea how Spark determined the number of executors
 for different stages?

 Thanks!

 Bill


 On Fri, Jul 11, 2014 at 2:01 PM, Bill Jay 
 wrote:

> Hi Tathagata,
>
> Below is my main function. I omit some filtering and data conversion
> functions. These functions are just a one-to-one mapping, which may not
> possible increase running time. The only reduce function I have here is
> groupByKey. There are 4 topics in my Kafka brokers and two of the topics
> have 240k lines each minute. And the other two topics have less than 30k
> lines per minute. The batch size is one minute and I specified 300
> executors in my spark-submit script. The default parallelism is 300.
>
>
> val parition = 300
> val zkQuorum = "zk1,zk2,zk3"
> val group = "my-group-" + currentTime.toString
> val topics = "topic1,topic2,topic3,topic4"
> val numThreads = 4
> val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
> ssc = new StreamingContext(conf, Seconds(batch))
> ssc.checkpoint(hadoopOutput + "checkpoint")
> val lines = lines1
> lines.cache()
> val jsonData = lines.map(JSON.parseFull(_))
> val mapData = jsonData.filter(_.isDefined)
>
> .map(_.get.asInstanceOf[scala.collection.immutable.Map[String, Any]])
> val validMapData = mapData.filter(isValidData(_))
> val fields = validMapData.map(data => (data("id").toString,
> timestampToUTCUnix(data("time").toString),
>
>  timestampToUTCUnix(data("local_time").toStr

Re: Memory & compute-intensive tasks

2014-07-14 Thread Matei Zaharia
I think coalesce with shuffle=true will force it to have one task per node. 
Without that, it might be that due to data locality it decides to launch 
multiple ones on the same node even though the total # of tasks is equal to the 
# of nodes.

If this is the *only* thing you run on the cluster, you could also configure 
the Workers to only report one core by manually launching the 
spark.deploy.worker.Worker process with that flag (see 
http://spark.apache.org/docs/latest/spark-standalone.html).

Matei

On Jul 14, 2014, at 1:59 PM, Daniel Siegmann  wrote:

> I don't have a solution for you (sorry), but do note that 
> rdd.coalesce(numNodes) keeps data on the same nodes where it was. If you set 
> shuffle=true then it should repartition and redistribute the data. But it 
> uses the hash partitioner according to the ScalaDoc - I don't know of any way 
> to supply a custom partitioner.
> 
> 
> On Mon, Jul 14, 2014 at 4:09 PM, Ravi Pandya  wrote:
> I'm trying to run a job that includes an invocation of a memory & 
> compute-intensive multithreaded C++ program, and so I'd like to run one task 
> per physical node. Using rdd.coalesce(# nodes) seems to just allocate one 
> task per core, and so runs out of memory on the node. Is there any way to 
> give the scheduler a hint that the task uses lots of memory and cores so it 
> spreads it out more evenly?
> 
> Thanks,
> 
> Ravi Pandya
> Microsoft Research
> 
> 
> 
> -- 
> Daniel Siegmann, Software Developer
> Velos
> Accelerating Machine Learning
> 
> 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
> E: daniel.siegm...@velos.io W: www.velos.io



Re: Catalyst dependency on Spark Core

2014-07-14 Thread Matei Zaharia
Yeah, I'd just add a spark-util that has these things.

Matei

On Jul 14, 2014, at 1:04 PM, Michael Armbrust  wrote:

> Yeah, sadly this dependency was introduced when someone consolidated the 
> logging infrastructure.  However, the dependency should be very small and 
> thus easy to remove, and I would like catalyst to be usable outside of Spark. 
>  A pull request to make this possible would be welcome.
> 
> Ideally, we'd create some sort of spark common package that has things like 
> logging.  That way catalyst could depend on that, without pulling in all of 
> Hadoop, etc.  Maybe others have opinions though, so I'm cc-ing the dev list.
> 
> 
> On Mon, Jul 14, 2014 at 12:21 AM, Yanbo Liang  wrote:
> Make Catalyst independent of Spark is the goal of Catalyst, maybe need time 
> and evolution.
> I awared that package org.apache.spark.sql.catalyst.util embraced 
> org.apache.spark.util.{Utils => SparkUtils},
> so that Catalyst has a dependency on Spark core. 
> I'm not sure whether it will be replaced by other component independent of 
> Spark in later release.
> 
> 
> 2014-07-14 11:51 GMT+08:00 Aniket Bhatnagar :
> 
> As per the recent presentation given in Scala days 
> (http://people.apache.org/~marmbrus/talks/SparkSQLScalaDays2014.pdf), it was 
> mentioned that Catalyst is independent of Spark. But on inspecting pom.xml of 
> sql/catalyst module, it seems it has a dependency on Spark Core. Any 
> particular reason for the dependency? I would love to use Catalyst outside 
> Spark
> 
> (reposted as previous email bounced. Sorry if this is a duplicate).
> 
> 



Re: Spark 1.0.1 EC2 - Launching Applications

2014-07-14 Thread Matei Zaharia
The script should be there, in the spark/bin directory. What command did you 
use to launch the cluster?

Matei

On Jul 14, 2014, at 1:12 PM, Josh Happoldt  wrote:

> Hi All,
> 
> I've used the spark-ec2 scripts to build a simple 1.0.1 Standalone cluster on 
> EC2.  It appears that the spark-submit script is not bundled with a spark-ec2 
> install.  Given that:  What is the recommended way to execute spark jobs on a 
> standalone EC2 cluster?  Spark-submit provides extremely useful features that 
> are still useful for EC2 deployments.
> 
> We've used workarounds like modifying the spark-classpath and using 
> run-example in the past to run simple one-time EC2 jobs.  The 'Running 
> Applications' section of the EC2-Scripts documentation does not mention how 
> to actually submit jobs to the cluster either.
> 
> Thanks!
> 
> Josh
> 
> 
> 



Re: Memory & compute-intensive tasks

2014-07-14 Thread Daniel Siegmann
Depending on how your C++ program is designed, maybe you can feed the data
from multiple partitions into the same process? Getting the results back
might be tricky. But that may be the only way to guarantee you're only
using one invocation per node.


On Mon, Jul 14, 2014 at 5:12 PM, Matei Zaharia 
wrote:

> I think coalesce with shuffle=true will force it to have one task per
> node. Without that, it might be that due to data locality it decides to
> launch multiple ones on the same node even though the total # of tasks is
> equal to the # of nodes.
>
> If this is the *only* thing you run on the cluster, you could also
> configure the Workers to only report one core by manually launching the
> spark.deploy.worker.Worker process with that flag (see
> http://spark.apache.org/docs/latest/spark-standalone.html).
>
> Matei
>
> On Jul 14, 2014, at 1:59 PM, Daniel Siegmann 
> wrote:
>
> I don't have a solution for you (sorry), but do note that
> rdd.coalesce(numNodes) keeps data on the same nodes where it was. If you
> set shuffle=true then it should repartition and redistribute the data.
> But it uses the hash partitioner according to the ScalaDoc - I don't know
> of any way to supply a custom partitioner.
>
>
> On Mon, Jul 14, 2014 at 4:09 PM, Ravi Pandya  wrote:
>
>> I'm trying to run a job that includes an invocation of a memory &
>> compute-intensive multithreaded C++ program, and so I'd like to run one
>> task per physical node. Using rdd.coalesce(# nodes) seems to just allocate
>> one task per core, and so runs out of memory on the node. Is there any way
>> to give the scheduler a hint that the task uses lots of memory and cores so
>> it spreads it out more evenly?
>>
>> Thanks,
>>
>> Ravi Pandya
>> Microsoft Research
>>
>
>
>
> --
> Daniel Siegmann, Software Developer
> Velos
> Accelerating Machine Learning
>
> 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
> E: daniel.siegm...@velos.io W: www.velos.io
>
>
>


-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: pyspark sc.parallelize running OOM with smallish data

2014-07-14 Thread Mohit Jaggi
Continuing to debug with Scala, I tried this on local with enough memory
(10g) and it is able to count the dataset. With more memory(for executor
and driver) in a cluster it still fails. The data is about 2Gbytes. It is
30k * 4k doubles.


On Sat, Jul 12, 2014 at 6:31 PM, Aaron Davidson  wrote:

> I think this is probably dying on the driver itself, as you are probably
> materializing the whole dataset inside your python driver. How large is
> spark_data_array compared to your driver memory?
>
>
> On Fri, Jul 11, 2014 at 7:30 PM, Mohit Jaggi  wrote:
>
>> I put the same dataset into scala (using spark-shell) and it acts weird.
>> I cannot do a count on it, the executors seem to hang. The WebUI shows 0/96
>> in the status bar, shows details about the worker nodes but there is no
>> progress.
>> sc.parallelize does finish (takes too long for the data size) in scala.
>>
>>
>> On Fri, Jul 11, 2014 at 2:00 PM, Mohit Jaggi 
>> wrote:
>>
>>> spark_data_array here has about 35k rows with 4k columns. I have 4 nodes
>>> in the cluster and gave 48g to executors. also tried kyro serialization.
>>>
>>> traceback (most recent call last):
>>>
>>>   File "/mohit/./m.py", line 58, in 
>>>
>>> spark_data = sc.parallelize(spark_data_array)
>>>
>>>   File "/mohit/spark/python/pyspark/context.py", line 265, in parallelize
>>>
>>> jrdd = readRDDFromFile(self._jsc, tempFile.name, numSlices)
>>>
>>>   File
>>> "/mohit/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py", line
>>> 537, in __call__
>>>
>>>   File "/mohit/spark/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py",
>>> line 300, in get_return_value
>>>
>>> py4j.protocol.Py4JJavaError: An error occurred while calling
>>> z:org.apache.spark.api.python.PythonRDD.readRDDFromFile.
>>>
>>> : java.lang.OutOfMemoryError: Java heap space
>>>
>>> at
>>> org.apache.spark.api.python.PythonRDD$.readRDDFromFile(PythonRDD.scala:279)
>>>
>>> at org.apache.spark.api.python.PythonRDD.readRDDFromFile(PythonRDD.scala)
>>>
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>
>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>>
>>> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
>>>
>>> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
>>>
>>> at py4j.Gateway.invoke(Gateway.java:259)
>>>
>>> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>>>
>>> at py4j.commands.CallCommand.execute(CallCommand.java:79)
>>>
>>> at py4j.GatewayConnection.run(GatewayConnection.java:207)
>>>
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>
>>
>


Re: Spark Streaming Json file groupby function

2014-07-14 Thread srinivas
Hi,
  Thanks for ur reply...i imported StreamingContext and right now i am
getting my Dstream as something like
 map(id -> 123, name -> srini, mobile -> 12324214, score -> 123, test_type
-> math)
 map(id -> 321, name -> vasu, mobile -> 73942090, score -> 324, test_type
->sci)
 map(id -> 432, name ->, mobile ->423141234,score -> 322,test_type ->
math)

each map collection is from json string. now if i want aggregrate the scores
on only math or if i want to find out who got the highest score in math that
shows both name and score..i would like to what transformation should i do
to my existing dstream.I am very new to dealing with maps and dstream
transformations..so please advise on how to proceed from here. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p9656.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: How to kill running spark yarn application

2014-07-14 Thread Jerry Lam
Hi Siyuan,

I wonder if you --master yarn-cluster or yarn-client?

Best Regards,

Jerry


On Mon, Jul 14, 2014 at 5:08 PM, hsy...@gmail.com  wrote:

> Hi all,
>
> A newbie question, I start a spark yarn application through spark-submit
> 
> How do I kill this app. I can kill the yarn app by "yarn application -kill
> appid" but the application master is still running. What's the proper way
> to shutdown the entire app?
>
> Best,
> Siyuan
>


Re: SparkR failed to connect to the master

2014-07-14 Thread cjwang
I restarted Spark Master with spark-0.9.1 and SparkR was able to communicate
with the Master.  I am using the latest SparkR pkg-e1f95b6.  Maybe it has
problem communicating to Spark 1.0.0?





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-failed-to-connect-to-the-master-tp9359p9658.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Parsing Json object definition spanning multiple lines

2014-07-14 Thread SK
Hi,

I have a json file where the definition of each object spans multiple lines.
An example of one object definition appears below.

 {
"name": "16287e9cdf",
"width": 500,
"height": 325,
"width": 1024,
"height": 665,
"obj": [
  {
"x": 395.08,
"y": 82.09,
"w": 185.48677,
"h": 185.48677,
"min": 50,
"max": 59,
"attr1": 2,
"attr2": 68,
"attr3": 8
  },
  {
"x": 519.1,
"y": 225.8,
"w": 170,
"h": 171,
"min": 20,
"max": 29,
"attr1": 7,
"attr2": 93,
"attr3": 10
  }
   ]
}

I used the following Spark code to parse the file. However, the parsing is
failing because I think it expects one Json object definition per line. I
can try to preprocess the input file  to remove the new lines, but I would
like to know if it is possible to parse a Json object definition that spans
multiple lines, directly in Spark.  

val inp = sc.textFile(args(0))
val res = inp.map(line => { parse(line) })
   .map(json =>
  {
 implicit lazy val formats =
org.json4s.DefaultFormats
 val image = (json \ "name").extract[String]
  }
)


Thanks for  your help.





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Parsing-Json-object-definition-spanning-multiple-lines-tp9659.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: How to kill running spark yarn application

2014-07-14 Thread hsy...@gmail.com
yarn-cluster


On Mon, Jul 14, 2014 at 2:44 PM, Jerry Lam  wrote:

> Hi Siyuan,
>
> I wonder if you --master yarn-cluster or yarn-client?
>
> Best Regards,
>
> Jerry
>
>
> On Mon, Jul 14, 2014 at 5:08 PM, hsy...@gmail.com 
> wrote:
>
>> Hi all,
>>
>> A newbie question, I start a spark yarn application through spark-submit
>> 
>> How do I kill this app. I can kill the yarn app by "yarn application
>> -kill appid" but the application master is still running. What's the proper
>> way to shutdown the entire app?
>>
>> Best,
>> Siyuan
>>
>
>


Re: Spark Streaming Json file groupby function

2014-07-14 Thread srinivas
Hi,
  Thanks for ur reply...i imported StreamingContext and right now i am
getting my Dstream as something like
 map(id -> 123, name -> srini, mobile -> 12324214, score -> 123, test_type
-> math)
 map(id -> 321, name -> vasu, mobile -> 73942090, score -> 324, test_type
->sci)
 map(id -> 432, name ->, mobile ->423141234,score -> 322,test_type ->
math)

each map collection is from json string. now if i want aggregrate the scores
on only math or if i want to find out who got the highest score in math that
shows both name and score..i would like to what transformation should i do
to my existing dstream.I am very new to dealing with maps and dstream
transformations..so please advise on how to proceed from here. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p9661.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Number of executors change during job running

2014-07-14 Thread Tathagata Das
Can you give me a screen shot of the stages page in the web ui, the spark
logs, and the code that is causing this behavior. This seems quite weird to
me.

TD


On Mon, Jul 14, 2014 at 2:11 PM, Bill Jay 
wrote:

> Hi Tathagata,
>
> It seems repartition does not necessarily force Spark to distribute the
> data into different executors. I have launched a new job which uses
> repartition right after I received data from Kafka. For the first two
> batches, the reduce stage used more than 80 executors. Starting from the
> third batch, there were always only 2 executors in the reduce task
> (combineByKey). Even with the first batch which used more than 80
> executors, it took 2.4 mins to finish the reduce stage for a very small
> amount of data.
>
> Bill
>
>
> On Mon, Jul 14, 2014 at 12:30 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> After using repartition(300), how many executors did it run on? By the
>> way, repartitions(300) means it will divide the shuffled data into 300
>> partitions. Since there are many cores on each of the 300
>> machines/executors, these partitions (each requiring a core) may not be
>> spread all 300 executors. Hence, if you really want spread it all 300
>> executors, you may have to bump up the partitions even more. However,
>> increasing the partitions to too high may not be beneficial, and you will
>> have play around with the number to figure out sweet spot that reduces the
>> time to process the stage / time to process the whole batch.
>>
>> TD
>>
>>
>> On Fri, Jul 11, 2014 at 8:32 PM, Bill Jay 
>> wrote:
>>
>>> Hi Tathagata,
>>>
>>> Do you mean that the data is not shuffled until the reduce stage? That
>>> means groupBy still only uses 2 machines?
>>>
>>> I think I used repartition(300) after I read the data from Kafka into
>>> DStream. It seems that it did not guarantee that the map or reduce stages
>>> will be run on 300 machines. I am currently trying to initiate 100 DStream
>>> from KafkaUtils.createDStream and union them. Now the reduce stages had
>>> around 80 machines for all the batches. However, this method will introduce
>>> many dstreams. It will be good if we can control the number of executors in
>>> the groupBy operation because the calculation needs to be finished within 1
>>> minute for different size of input data based on our production need.
>>>
>>> Thanks!
>>>
>>>
>>> Bill
>>>
>>>
>>> On Fri, Jul 11, 2014 at 7:29 PM, Tathagata Das <
>>> tathagata.das1...@gmail.com> wrote:
>>>
 Aah, I get it now. That is because the input data streams is replicated
 on two machines, so by locality the data is processed on those two
 machines. So the "map" stage on the data uses 2 executors, but the "reduce"
 stage, (after groupByKey) the saveAsTextFiles would use 300 tasks. And the
 default parallelism takes into affect only when the data is explicitly
 shuffled around.

 You can fix this by explicitly repartitioning the data.

 inputDStream.repartition(partitions)

 This is covered in the streaming tuning guide
 
 .

 TD



 On Fri, Jul 11, 2014 at 4:11 PM, Bill Jay 
 wrote:

> Hi folks,
>
> I just ran another job that only received data from Kafka, did some
> filtering, and then save as text files in HDFS. There was no reducing work
> involved. Surprisingly, the number of executors for the saveAsTextFiles
> stage was also 2 although I specified 300 executors in the job submission.
> As a result, the simple save file action took more than 2 minutes. Do you
> have any idea how Spark determined the number of executors
> for different stages?
>
> Thanks!
>
> Bill
>
>
> On Fri, Jul 11, 2014 at 2:01 PM, Bill Jay 
> wrote:
>
>> Hi Tathagata,
>>
>> Below is my main function. I omit some filtering and data conversion
>> functions. These functions are just a one-to-one mapping, which may not
>> possible increase running time. The only reduce function I have here is
>> groupByKey. There are 4 topics in my Kafka brokers and two of the topics
>> have 240k lines each minute. And the other two topics have less than 30k
>> lines per minute. The batch size is one minute and I specified 300
>> executors in my spark-submit script. The default parallelism is 300.
>>
>>
>> val parition = 300
>> val zkQuorum = "zk1,zk2,zk3"
>> val group = "my-group-" + currentTime.toString
>> val topics = "topic1,topic2,topic3,topic4"
>> val numThreads = 4
>> val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
>> ssc = new StreamingContext(conf, Seconds(batch))
>> ssc.checkpoint(hadoopOutput + "checkpoint")
>> val lines = lines1
>> lines.cache()
>> val jsonData = lines.map(JSON.parseFull(_))

Re: Spark Streaming Json file groupby function

2014-07-14 Thread Tathagata Das
In general it may be a better idea to actually convert the records from
hashmaps, to a specific data structure. Say

case class Record(id: int, name: String, mobile: String, score: Int,
test_type: String ... )

Then you should be able to do something like

val records = jsonf.map(m => convertMapToRecord(m))

Then to filter only math results you can do records.filter(r => r.test_type
== "math"). ...

If you have to do aggregations (sum, max, etc.) you have to figure out
whether you want to aggregate in every batch, or aggregate over a window of
time.

If you want to do each batch, then

filteredRecords.foreachRDD(rdd => {
   // get aggregates for each batch
})

If you want to do across a window of time (say 1 minute), then

filteredRecords.window(Minutes(1)).foreachRDD( rdd => {
   // get aggregates over last 1 minute, every 10 seconds (since 10 second
is the batch interval)
})




On Mon, Jul 14, 2014 at 3:06 PM, srinivas  wrote:

> Hi,
>   Thanks for ur reply...i imported StreamingContext and right now i am
> getting my Dstream as something like
>  map(id -> 123, name -> srini, mobile -> 12324214, score -> 123, test_type
> -> math)
>  map(id -> 321, name -> vasu, mobile -> 73942090, score -> 324, test_type
> ->sci)
>  map(id -> 432, name ->, mobile ->423141234,score -> 322,test_type ->
> math)
>
> each map collection is from json string. now if i want aggregrate the
> scores
> on only math or if i want to find out who got the highest score in math
> that
> shows both name and score..i would like to what transformation should i do
> to my existing dstream.I am very new to dealing with maps and dstream
> transformations..so please advise on how to proceed from here.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p9661.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


import org.apache.spark.streaming.twitter._ in Shell

2014-07-14 Thread durin
I'm using spark > 1.0.0 (three weeks old build of latest). 
Along the lines of  this tutorial

 
, I want to read some tweets from twitter.
When trying to execute  in the Spark-Shell, I get

The tutorial builds an app via sbt/sbt. Are there any special requirements
for importing the TwitterUtils in the shell?


Best regards,
Simon




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/import-org-apache-spark-streaming-twitter-in-Shell-tp9665.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: import org.apache.spark.streaming.twitter._ in Shell

2014-07-14 Thread Tathagata Das
The twitter functionality is not available through the shell.
1) we separated these non-core functionality into separate subprojects so
that their dependencies do not collide/pollute those of of core spark
2) a shell is not really the best way to start a long running stream.

Its best to use twitter through a separate project.

TD


On Mon, Jul 14, 2014 at 3:47 PM, durin  wrote:

> I'm using spark > 1.0.0 (three weeks old build of latest).
> Along the lines of  this tutorial
> <
> http://ampcamp.berkeley.edu/big-data-mini-course/realtime-processing-with-spark-streaming.html
> >
> , I want to read some tweets from twitter.
> When trying to execute  in the Spark-Shell, I get
>
> The tutorial builds an app via sbt/sbt. Are there any special requirements
> for importing the TwitterUtils in the shell?
>
>
> Best regards,
> Simon
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/import-org-apache-spark-streaming-twitter-in-Shell-tp9665.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Change when loading/storing String data using Parquet

2014-07-14 Thread Michael Armbrust
I just wanted to send out a quick note about a change in the handling of
strings when loading / storing data using parquet and Spark SQL.  Before,
Spark SQL did not support binary data in Parquet, so all binary blobs were
implicitly treated as Strings.  9fe693

fixes
this limitation by adding support for binary data.

However, data written out with a prior version of Spark SQL will be missing
the annotation telling us to interpret a given column as a String, so old
string data will now be loaded as binary data.  If you would like to use
the data as a string, you will need to add a CAST to convert the datatype.

New string data written out after this change, will correctly be loaded in
as a string as now we will include an annotation about the desired type.
 Additionally, this should now interoperate correctly with other systems
that write Parquet data (hive, thrift, etc).

Michael


SQL + streaming

2014-07-14 Thread hsy...@gmail.com
Hi All,

Couple days ago, I tried to integrate SQL and streaming together. My
understanding is I can transform RDD from Dstream to schemaRDD and execute
SQL on each RDD. But I got no luck
Would you guys help me take a look at my code?  Thank you very much!

object KafkaSpark {

  def main(args: Array[String]): Unit = {
if (args.length < 4) {
  System.err.println("Usage: KafkaSpark   
")
  System.exit(1)
}


val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("KafkaSpark")
val ssc =  new StreamingContext(sparkConf, Seconds(10))
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc);
//ssc.checkpoint("checkpoint")

// Importing the SQL context gives access to all the SQL functions and
implicit conversions.
import sqlContext._


val tt = Time(1)
val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
val recordsStream = KafkaUtils.createStream(ssc, zkQuorum, group,
topicpMap).map(t => getRecord(t._2.split("#")))

val result = recordsStream.foreachRDD((recRDD, tt)=>{
  recRDD.registerAsTable("records")
  val result = sql("select * from records")
  println(result)
  result.foreach(println)
})

ssc.start()
ssc.awaitTermination()

  }

  def getRecord(l:Array[String]):Record = {
println("Getting the record")
Record(l(0), l(1))}
}


Re: Stateful RDDs?

2014-07-14 Thread Tathagata Das
Trying answer your questions as concisely as possible

1. In the current implementation, the entire state RDD needs to loaded for
any update. It is a known limitation, that we want to overcome in the
future. Therefore the state Dstream should not be persisted to disk as all
the data in the state RDDs are touched in every batch. Since spark
streaming is not really a dedicated data store, its not really designed to
separate out hot data and cold data.
2. For each key, in the state you could maintain a timestamp of when it was
updated and accordingly return None to filter that state out. Regarding
filtering by the minimum key, there may be a way to periodically figure out
the minimum key at the driver, then propagate out that information to the
executors (update a static variable in the executors) and use that to
filter out the keys.

Hope this helps.

TD



On Thu, Jul 10, 2014 at 10:25 AM, Sargun Dhillon  wrote:

> So, one portion of our Spark streaming application requires some
> state. Our application takes a bunch of application events (i.e.
> user_session_started, user_session_ended, etc..), and calculates out
> metrics from these, and writes them to a serving layer (see: Lambda
> Architecture). Two related events can be ingested into the streaming
> context moments apart, or time inderminate. Given this, and the fact
> that our normal windows pump data out every 500-1 ms, with a step
> of 500ms, you might end up with two related pieces of data across two
> windows. In order to work around this, we go ahead and do
> updateStateByKey to persist state, as opposed to persisting key
> intermediate state in some external system, as building a system to
> handle the complexities of (concurrent, idempotent) updates, as well
> as ensure scalability is non-trivial.
>
> The questions I have around this, is even in a highly-partitionable
> dataset, what's the upper "scalability" limits with stateful dstreams?
> If I have a dataset, starting at around 10-million keys, growing at
> that rate monthly, what are the complexities within? Most of the data
> is cold. I realize that I can remove data from the stateful dstream,
> by sending (key, null) to it, but there is not necessarily an easy way
> of knowing when the last update is coming in (unless there is some way
> in spark of saying, "Wait N windows, and send this tuple" or "Wait
> until all keys in the upstream Dstreams smaller than M are processed"
> before sending such a tuple. Additionally, given that my data is
> partitionable by datetime, does it make sense to have a custom
> datetime partitioner, and just persist the dstream to disk, to ensure
> that its RDDs are only pulled off of disk (into memory) occasionally?
> What's the cost of having a bunch of relatively large, stateful RDDs
> around on disk? Does Spark have to load / deserialize the entire RDD
> to update one key?
>


Spark-Streaming collect/take functionality.

2014-07-14 Thread jon.burns
Hello everyone,

I'm an undergrad working on a summarization project. I've created a
summarizer in normal Spark and it works great, however I want to write it
for Spark_Streaming to increase it's functionality. Basically I take in a
bunch of text and get the most popular words as well as most popular
bi-grams (Two words together), and I've managed to do this with streaming
(And made it stateful, which is great). However the next part of my
algorithm requires me to get the top 10 words and top 10 bigrams and store
them in a vector like structure. With just spark I would use code like;

array_of_words = words.sortByKey().top(50)

Is there a way to mimick this with streaming? I was following along with the
ampcamp  tutorial

  
so I know that you can print the top 10 by using; 

sortedCounts.foreach(rdd =>
  println("\nTop 10 hashtags:\n" + rdd.take(10).mkString("\n")))

However I can't seem to alter this to make it store the top 10, just print
them. The instructor mentions at the end that

"one can get the top 10 hashtags in each partition, collect them together at
the driver and then find the top 10 hashtags among them" but they leave it
as an exercise. I would appreciate any help :)

Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-collect-take-functionality-tp9670.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: How to kill running spark yarn application

2014-07-14 Thread Jerry Lam
Then yarn application -kill appid should work. This is what I did 2 hours ago.

Sorry I cannot provide more help.


Sent from my iPhone

> On 14 Jul, 2014, at 6:05 pm, "hsy...@gmail.com"  wrote:
> 
> yarn-cluster
> 
> 
>> On Mon, Jul 14, 2014 at 2:44 PM, Jerry Lam  wrote:
>> Hi Siyuan,
>> 
>> I wonder if you --master yarn-cluster or yarn-client?
>> 
>> Best Regards,
>> 
>> Jerry
>> 
>> 
>>> On Mon, Jul 14, 2014 at 5:08 PM, hsy...@gmail.com  wrote:
>>> Hi all,
>>> 
>>> A newbie question, I start a spark yarn application through spark-submit 
>>> 
>>> How do I kill this app. I can kill the yarn app by "yarn application -kill 
>>> appid" but the application master is still running. What's the proper way 
>>> to shutdown the entire app?
>>> 
>>> Best,
>>> Siyuan
> 


Re: How to kill running spark yarn application

2014-07-14 Thread hsy...@gmail.com
Before "yarn application -kill" If you do jps You'll have a list
of SparkSubmit and ApplicationMaster

After you use yarn applicaton -kill you only kill the SparkSubmit



On Mon, Jul 14, 2014 at 4:29 PM, Jerry Lam  wrote:

> Then yarn application -kill appid should work. This is what I did 2 hours
> ago.
>
> Sorry I cannot provide more help.
>
>
> Sent from my iPhone
>
> On 14 Jul, 2014, at 6:05 pm, "hsy...@gmail.com"  wrote:
>
> yarn-cluster
>
>
> On Mon, Jul 14, 2014 at 2:44 PM, Jerry Lam  wrote:
>
>> Hi Siyuan,
>>
>> I wonder if you --master yarn-cluster or yarn-client?
>>
>> Best Regards,
>>
>> Jerry
>>
>>
>> On Mon, Jul 14, 2014 at 5:08 PM, hsy...@gmail.com 
>> wrote:
>>
>>> Hi all,
>>>
>>> A newbie question, I start a spark yarn application through spark-submit
>>> 
>>> How do I kill this app. I can kill the yarn app by "yarn application
>>> -kill appid" but the application master is still running. What's the proper
>>> way to shutdown the entire app?
>>>
>>> Best,
>>> Siyuan
>>>
>>
>>
>


Re: SQL + streaming

2014-07-14 Thread Tathagata Das
Could you elaborate on what is the problem you are facing? Compiler error?
Runtime error? Class-not-found error? Not receiving any data from Kafka?
Receiving data but SQL command throwing error? No errors but no output
either?

TD


On Mon, Jul 14, 2014 at 4:06 PM, hsy...@gmail.com  wrote:

> Hi All,
>
> Couple days ago, I tried to integrate SQL and streaming together. My
> understanding is I can transform RDD from Dstream to schemaRDD and execute
> SQL on each RDD. But I got no luck
> Would you guys help me take a look at my code?  Thank you very much!
>
> object KafkaSpark {
>
>   def main(args: Array[String]): Unit = {
> if (args.length < 4) {
>   System.err.println("Usage: KafkaSpark   
> ")
>   System.exit(1)
> }
>
>
> val Array(zkQuorum, group, topics, numThreads) = args
> val sparkConf = new SparkConf().setAppName("KafkaSpark")
> val ssc =  new StreamingContext(sparkConf, Seconds(10))
> val sc = new SparkContext(sparkConf)
> val sqlContext = new SQLContext(sc);
> //ssc.checkpoint("checkpoint")
>
> // Importing the SQL context gives access to all the SQL functions and
> implicit conversions.
> import sqlContext._
>
>
> val tt = Time(1)
> val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
> val recordsStream = KafkaUtils.createStream(ssc, zkQuorum, group,
> topicpMap).map(t => getRecord(t._2.split("#")))
>
> val result = recordsStream.foreachRDD((recRDD, tt)=>{
>   recRDD.registerAsTable("records")
>   val result = sql("select * from records")
>   println(result)
>   result.foreach(println)
> })
>
> ssc.start()
> ssc.awaitTermination()
>
>   }
>
>   def getRecord(l:Array[String]):Record = {
> println("Getting the record")
> Record(l(0), l(1))}
> }
>
>


Re: Spark-Streaming collect/take functionality.

2014-07-14 Thread Tathagata Das
Why doesnt something like this work? If you want a continuously updated
reference to the top counts, you can use a global variable.

var topCounts: Array[(String, Int)] = null

sortedCounts.foreachRDD (rdd =>
val currentTopCounts = rdd.take(10)
// print currentTopCounts it or watever
   topCounts = currentTopCounts
)

TD


On Mon, Jul 14, 2014 at 4:11 PM, jon.burns  wrote:

> Hello everyone,
>
> I'm an undergrad working on a summarization project. I've created a
> summarizer in normal Spark and it works great, however I want to write it
> for Spark_Streaming to increase it's functionality. Basically I take in a
> bunch of text and get the most popular words as well as most popular
> bi-grams (Two words together), and I've managed to do this with streaming
> (And made it stateful, which is great). However the next part of my
> algorithm requires me to get the top 10 words and top 10 bigrams and store
> them in a vector like structure. With just spark I would use code like;
>
> array_of_words = words.sortByKey().top(50)
>
> Is there a way to mimick this with streaming? I was following along with
> the
> ampcamp  tutorial
> <
> http://ampcamp.berkeley.edu/big-data-mini-course/realtime-processing-with-spark-streaming.html
> >
> so I know that you can print the top 10 by using;
>
> sortedCounts.foreach(rdd =>
>   println("\nTop 10 hashtags:\n" + rdd.take(10).mkString("\n")))
>
> However I can't seem to alter this to make it store the top 10, just print
> them. The instructor mentions at the end that
>
> "one can get the top 10 hashtags in each partition, collect them together
> at
> the driver and then find the top 10 hashtags among them" but they leave it
> as an exercise. I would appreciate any help :)
>
> Thanks
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-collect-take-functionality-tp9670.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: Possible bug in Spark Streaming :: TextFileStream

2014-07-14 Thread Tathagata Das
Oh yes, this was a bug and it has been fixed. Checkout from the master
branch!

https://issues.apache.org/jira/browse/SPARK-2362?jql=project%20%3D%20SPARK%20AND%20resolution%20%3D%20Unresolved%20AND%20component%20%3D%20Streaming%20ORDER%20BY%20created%20DESC%2C%20priority%20ASC

TD


On Mon, Jul 7, 2014 at 7:11 AM, Luis Ángel Vicente Sánchez <
langel.gro...@gmail.com> wrote:

> I have a basic spark streaming job that is watching a folder, processing
> any new file and updating a column family in cassandra using the new
> cassandra-spark-driver.
>
> I think there is a problem with SparkStreamingContext.textFileStream... if
> I start my job in local mode with no files in the folder that is watched
> and then I copy a bunch of files, sometimes spark is continually processing
> those files again and again.
>
> I have noticed that it usually happens when spark doesn't detect all new
> files in one go... i.e. I copied 6 files and spark detected 3 of them as
> new and processed them; then it detected the other 3 as new and processed
> them. After it finished to process all 6 files, it detected again the first
> 3 files as new files and processed them... then the other 3... and again...
> and again... and again.
>
> Should I rise a JIRA issue?
>
> Regards,
>
> Luis
>


running spark from intellj

2014-07-14 Thread jamborta
hi all,

I have simple example that reads a file in and counts the number of rows as
follows:

val conf = new SparkConf()
  .setMaster("spark://spark-master:7077")
  .setAppName("Test")
  .set("spark.executor.memory", "256m")
val sc = new SparkContext(conf)
val data = sc.textFile("/data/df_data.txt").map { line =>
  line.split(',')
}.cache()
println(data.count())

I can run this using spark-submit, but could not figure out how to run it
from an IDE (ie intellj). Not sure what the correct settings are. 

(I am getting the following error, I assume this is a config issue)

Exception in thread "main" org.apache.spark.SparkException: Job aborted due
to stage failure: Task 0.0:0 failed 4 times, most recent failure: TID 7 on
host spark-master.local failed for unknown reason
Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/running-spark-from-intellj-tp9676.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: SQL + streaming

2014-07-14 Thread hsy...@gmail.com
No errors but no output either... Thanks!


On Mon, Jul 14, 2014 at 4:59 PM, Tathagata Das 
wrote:

> Could you elaborate on what is the problem you are facing? Compiler error?
> Runtime error? Class-not-found error? Not receiving any data from Kafka?
> Receiving data but SQL command throwing error? No errors but no output
> either?
>
> TD
>
>
> On Mon, Jul 14, 2014 at 4:06 PM, hsy...@gmail.com 
> wrote:
>
>> Hi All,
>>
>> Couple days ago, I tried to integrate SQL and streaming together. My
>> understanding is I can transform RDD from Dstream to schemaRDD and execute
>> SQL on each RDD. But I got no luck
>> Would you guys help me take a look at my code?  Thank you very much!
>>
>> object KafkaSpark {
>>
>>   def main(args: Array[String]): Unit = {
>> if (args.length < 4) {
>>   System.err.println("Usage: KafkaSpark   
>> ")
>>   System.exit(1)
>> }
>>
>>
>> val Array(zkQuorum, group, topics, numThreads) = args
>> val sparkConf = new SparkConf().setAppName("KafkaSpark")
>> val ssc =  new StreamingContext(sparkConf, Seconds(10))
>> val sc = new SparkContext(sparkConf)
>> val sqlContext = new SQLContext(sc);
>> //ssc.checkpoint("checkpoint")
>>
>> // Importing the SQL context gives access to all the SQL functions
>> and implicit conversions.
>> import sqlContext._
>>
>>
>> val tt = Time(1)
>> val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
>> val recordsStream = KafkaUtils.createStream(ssc, zkQuorum, group,
>> topicpMap).map(t => getRecord(t._2.split("#")))
>>
>> val result = recordsStream.foreachRDD((recRDD, tt)=>{
>>   recRDD.registerAsTable("records")
>>   val result = sql("select * from records")
>>   println(result)
>>   result.foreach(println)
>> })
>>
>> ssc.start()
>> ssc.awaitTermination()
>>
>>   }
>>
>>   def getRecord(l:Array[String]):Record = {
>> println("Getting the record")
>> Record(l(0), l(1))}
>> }
>>
>>
>


Re: import org.apache.spark.streaming.twitter._ in Shell

2014-07-14 Thread durin
Thanks. Can I see that a Class is not available in the shell somewhere in the
API Docs or do I have to find out by trial and error?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/import-org-apache-spark-streaming-twitter-in-Shell-tp9665p9678.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Error when testing with large sparse svm

2014-07-14 Thread Xiangrui Meng
Is it on a standalone server? There are several settings worthing checking:

1) number of partitions, which should match the number of cores
2) driver memory (you can see it from the executor tab of the Spark
WebUI and set it with "--driver-memory 10g"
3) the version of Spark you were running

Best,
Xiangrui

On Mon, Jul 14, 2014 at 12:14 PM, Srikrishna S  wrote:
> That is exactly the same error that I got. I am still having no success.
>
> Regards,
> Krishna
>
> On Mon, Jul 14, 2014 at 11:50 AM, crater  wrote:
>> Hi Krishna,
>>
>> Thanks for your help. Are you able to get your 29M data running yet? I fix
>> the previous problem by setting larger spark.akka.frameSize, but now I get
>> some other errors below. Did you get these errors before?
>>
>>
>> 14/07/14 11:32:20 ERROR TaskSchedulerImpl: Lost executor 1 on node7: remote
>> Akka client disassociated
>> 14/07/14 11:32:20 WARN TaskSetManager: Lost TID 20 (task 13.0:0)
>> 14/07/14 11:32:21 ERROR TaskSchedulerImpl: Lost executor 3 on node8: remote
>> Akka client disassociated
>> 14/07/14 11:32:21 WARN TaskSetManager: Lost TID 21 (task 13.0:1)
>> 14/07/14 11:32:23 ERROR TaskSchedulerImpl: Lost executor 6 on node3: remote
>> Akka client disassociated
>> 14/07/14 11:32:23 WARN TaskSetManager: Lost TID 22 (task 13.0:0)
>> 14/07/14 11:32:25 ERROR TaskSchedulerImpl: Lost executor 0 on node4: remote
>> Akka client disassociated
>> 14/07/14 11:32:25 WARN TaskSetManager: Lost TID 23 (task 13.0:1)
>> 14/07/14 11:32:26 ERROR TaskSchedulerImpl: Lost executor 5 on node1: remote
>> Akka client disassociated
>> 14/07/14 11:32:26 WARN TaskSetManager: Lost TID 24 (task 13.0:0)
>> 14/07/14 11:32:28 ERROR TaskSchedulerImpl: Lost executor 7 on node6: remote
>> Akka client disassociated
>> 14/07/14 11:32:28 WARN TaskSetManager: Lost TID 26 (task 13.0:0)
>> 14/07/14 11:32:28 ERROR TaskSetManager: Task 13.0:0 failed 4 times; aborting
>> job
>> Exception in thread "main" org.apache.spark.SparkException: Job aborted due
>> to stage failure: Task 13.0:0 failed 4 times, most recent failure: TID 26 on
>> host node6 failed for unknown reason
>> Driver stacktrace:
>> at
>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1044)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1028)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1026)
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at
>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1026)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634)
>> at scala.Option.foreach(Option.scala:236)
>> at
>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:634)
>> at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1229)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>> at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>> at 
>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at 
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>>
>>
>>
>> --
>> View this message in context: 
>> http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-testing-with-large-sparse-svm-tp9592p9623.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: SparkR failed to connect to the master

2014-07-14 Thread cjwang
I tried installing the latest Spark 1.0.1 and SparkR couldn't find the master
either.  I restarted with Spark 0.9.1 and SparkR was able to find the
master.  So, there seemed to be something that changed after Spark 1.0.0.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-failed-to-connect-to-the-master-tp9359p9680.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: SQL + streaming

2014-07-14 Thread Tathagata Das
Can you make sure you are running locally on more than 1 local cores? You
could set the master in the SparkConf as conf.setMaster("local[4]"). Then
see if there are jobs running on every batch of data in the Spark web ui
(running on localhost:4040). If you still dont get any output, try first
simple printing recRDD.count() in the foreachRDD (that is, first test spark
streaming). If you can get that to work, then I would test the Spark SQL
stuff.

TD


On Mon, Jul 14, 2014 at 5:25 PM, hsy...@gmail.com  wrote:

> No errors but no output either... Thanks!
>
>
> On Mon, Jul 14, 2014 at 4:59 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> Could you elaborate on what is the problem you are facing? Compiler
>> error? Runtime error? Class-not-found error? Not receiving any data from
>> Kafka? Receiving data but SQL command throwing error? No errors but no
>> output either?
>>
>> TD
>>
>>
>> On Mon, Jul 14, 2014 at 4:06 PM, hsy...@gmail.com 
>> wrote:
>>
>>> Hi All,
>>>
>>> Couple days ago, I tried to integrate SQL and streaming together. My
>>> understanding is I can transform RDD from Dstream to schemaRDD and execute
>>> SQL on each RDD. But I got no luck
>>> Would you guys help me take a look at my code?  Thank you very much!
>>>
>>> object KafkaSpark {
>>>
>>>   def main(args: Array[String]): Unit = {
>>> if (args.length < 4) {
>>>   System.err.println("Usage: KafkaSpark   
>>> ")
>>>   System.exit(1)
>>> }
>>>
>>>
>>> val Array(zkQuorum, group, topics, numThreads) = args
>>> val sparkConf = new SparkConf().setAppName("KafkaSpark")
>>> val ssc =  new StreamingContext(sparkConf, Seconds(10))
>>> val sc = new SparkContext(sparkConf)
>>> val sqlContext = new SQLContext(sc);
>>> //ssc.checkpoint("checkpoint")
>>>
>>> // Importing the SQL context gives access to all the SQL functions
>>> and implicit conversions.
>>> import sqlContext._
>>>
>>>
>>> val tt = Time(1)
>>> val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
>>> val recordsStream = KafkaUtils.createStream(ssc, zkQuorum, group,
>>> topicpMap).map(t => getRecord(t._2.split("#")))
>>>
>>> val result = recordsStream.foreachRDD((recRDD, tt)=>{
>>>   recRDD.registerAsTable("records")
>>>   val result = sql("select * from records")
>>>   println(result)
>>>   result.foreach(println)
>>> })
>>>
>>> ssc.start()
>>> ssc.awaitTermination()
>>>
>>>   }
>>>
>>>   def getRecord(l:Array[String]):Record = {
>>> println("Getting the record")
>>> Record(l(0), l(1))}
>>> }
>>>
>>>
>>
>


Re: import org.apache.spark.streaming.twitter._ in Shell

2014-07-14 Thread Tathagata Das
I guess this is not clearly documented. At a high level, any class that is
in the package

org.apache.spark.streaming.XXX   where XXX is in { twitter, kafka, flume,
zeromq, mqtt }

is not available in the Spark shell.

I have added this to the larger JIRA of things-to-add-to-streaming-docs
https://issues.apache.org/jira/browse/SPARK-2419

Thanks for bringing this to attention.

TD


On Mon, Jul 14, 2014 at 5:53 PM, durin  wrote:

> Thanks. Can I see that a Class is not available in the shell somewhere in
> the
> API Docs or do I have to find out by trial and error?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/import-org-apache-spark-streaming-twitter-in-Shell-tp9665p9678.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: import org.apache.spark.streaming.twitter._ in Shell

2014-07-14 Thread Nicholas Chammas
On Mon, Jul 14, 2014 at 6:52 PM, Tathagata Das 
wrote:

> The twitter functionality is not available through the shell.
>

I've been processing Tweets live from the shell, though not for a long
time. That's how I uncovered the problem with the Twitter receiver not
deregistering, btw.

Did I misunderstand your comment?

Nick


Re: Error when testing with large sparse svm

2014-07-14 Thread Srikrishna S
I am running Spark 1.0.1 on a 5 node yarn cluster. I have set the
driver memory to 8G and executor memory to about 12G.

Regards,
Krishna


On Mon, Jul 14, 2014 at 5:56 PM, Xiangrui Meng  wrote:
> Is it on a standalone server? There are several settings worthing checking:
>
> 1) number of partitions, which should match the number of cores
> 2) driver memory (you can see it from the executor tab of the Spark
> WebUI and set it with "--driver-memory 10g"
> 3) the version of Spark you were running
>
> Best,
> Xiangrui
>
> On Mon, Jul 14, 2014 at 12:14 PM, Srikrishna S  
> wrote:
>> That is exactly the same error that I got. I am still having no success.
>>
>> Regards,
>> Krishna
>>
>> On Mon, Jul 14, 2014 at 11:50 AM, crater  wrote:
>>> Hi Krishna,
>>>
>>> Thanks for your help. Are you able to get your 29M data running yet? I fix
>>> the previous problem by setting larger spark.akka.frameSize, but now I get
>>> some other errors below. Did you get these errors before?
>>>
>>>
>>> 14/07/14 11:32:20 ERROR TaskSchedulerImpl: Lost executor 1 on node7: remote
>>> Akka client disassociated
>>> 14/07/14 11:32:20 WARN TaskSetManager: Lost TID 20 (task 13.0:0)
>>> 14/07/14 11:32:21 ERROR TaskSchedulerImpl: Lost executor 3 on node8: remote
>>> Akka client disassociated
>>> 14/07/14 11:32:21 WARN TaskSetManager: Lost TID 21 (task 13.0:1)
>>> 14/07/14 11:32:23 ERROR TaskSchedulerImpl: Lost executor 6 on node3: remote
>>> Akka client disassociated
>>> 14/07/14 11:32:23 WARN TaskSetManager: Lost TID 22 (task 13.0:0)
>>> 14/07/14 11:32:25 ERROR TaskSchedulerImpl: Lost executor 0 on node4: remote
>>> Akka client disassociated
>>> 14/07/14 11:32:25 WARN TaskSetManager: Lost TID 23 (task 13.0:1)
>>> 14/07/14 11:32:26 ERROR TaskSchedulerImpl: Lost executor 5 on node1: remote
>>> Akka client disassociated
>>> 14/07/14 11:32:26 WARN TaskSetManager: Lost TID 24 (task 13.0:0)
>>> 14/07/14 11:32:28 ERROR TaskSchedulerImpl: Lost executor 7 on node6: remote
>>> Akka client disassociated
>>> 14/07/14 11:32:28 WARN TaskSetManager: Lost TID 26 (task 13.0:0)
>>> 14/07/14 11:32:28 ERROR TaskSetManager: Task 13.0:0 failed 4 times; aborting
>>> job
>>> Exception in thread "main" org.apache.spark.SparkException: Job aborted due
>>> to stage failure: Task 13.0:0 failed 4 times, most recent failure: TID 26 on
>>> host node6 failed for unknown reason
>>> Driver stacktrace:
>>> at
>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1044)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1028)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1026)
>>> at
>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>> at 
>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1026)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634)
>>> at scala.Option.foreach(Option.scala:236)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:634)
>>> at
>>> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1229)
>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>>> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>>> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>>> at
>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>>> at 
>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> at
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>> at 
>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>> at
>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context: 
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-testing-with-large-sparse-svm-tp9592p9623.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Ideal core count within a single JVM

2014-07-14 Thread Matei Zaharia
I see, so here might be the problem. With more cores, there's less memory 
available per core, and now many of your threads are doing external hashing 
(spilling data to disk), as evidenced by the calls to 
ExternalAppendOnlyMap.spill. Maybe with 10 threads, there was enough memory per 
task to do all its hashing there. It's true though that these threads appear to 
be CPU-bound, largely due to Java Serialization. You could get this to run 
quite a bit faster using Kryo. However that won't eliminate the issue of 
spilling here.

Matei

On Jul 14, 2014, at 1:02 PM, lokesh.gidra  wrote:

> I am only playing with 'N' in local[N]. I thought that by increasing N, Spark
> will automatically use more parallel tasks. Isn't it so? Can you please tell
> me how can I modify the number of parallel tasks?
> 
> For me, there are hardly any threads in BLOCKED state in jstack output. In
> 'top' I see my application consuming all the 48 cores all the time with
> N=48.
> 
> I am attaching two jstack outputs that I took will the application was
> running.
> 
> 
> Lokesh
> 
> lessoutput3.lessoutput3
> 
>   
> lessoutput4.lessoutput4
> 
>   
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Ideal-core-count-within-a-single-JVM-tp9566p9640.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Ideal core count within a single JVM

2014-07-14 Thread Matei Zaharia
BTW you can see the number of parallel tasks in the application UI 
(http://localhost:4040) or in the log messages (e.g. when it says progress: 
17/20, that means there are 20 tasks total and 17 are done). Spark will try to 
use at least one task per core in local mode so there might be more of them 
here, but if your file is big it will also have at least one task per 32 MB 
block of the file.

Matei

On Jul 14, 2014, at 6:39 PM, Matei Zaharia  wrote:

> I see, so here might be the problem. With more cores, there's less memory 
> available per core, and now many of your threads are doing external hashing 
> (spilling data to disk), as evidenced by the calls to 
> ExternalAppendOnlyMap.spill. Maybe with 10 threads, there was enough memory 
> per task to do all its hashing there. It's true though that these threads 
> appear to be CPU-bound, largely due to Java Serialization. You could get this 
> to run quite a bit faster using Kryo. However that won't eliminate the issue 
> of spilling here.
> 
> Matei
> 
> On Jul 14, 2014, at 1:02 PM, lokesh.gidra  wrote:
> 
>> I am only playing with 'N' in local[N]. I thought that by increasing N, Spark
>> will automatically use more parallel tasks. Isn't it so? Can you please tell
>> me how can I modify the number of parallel tasks?
>> 
>> For me, there are hardly any threads in BLOCKED state in jstack output. In
>> 'top' I see my application consuming all the 48 cores all the time with
>> N=48.
>> 
>> I am attaching two jstack outputs that I took will the application was
>> running.
>> 
>> 
>> Lokesh
>> 
>> lessoutput3.lessoutput3
>> 
>>   
>> lessoutput4.lessoutput4
>> 
>>   
>> 
>> 
>> 
>> 
>> --
>> View this message in context: 
>> http://apache-spark-user-list.1001560.n3.nabble.com/Ideal-core-count-within-a-single-JVM-tp9566p9640.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 



SPARK_WORKER_PORT (standalone cluster)

2014-07-14 Thread jay vyas
Hi spark !

What is the purpose of the randomly assigned SPARK_WORKER_PORT

from the documentation it sais to "join a cluster", but its not clear to me
how a random port could be used to communicate with other members of a
spark  pool.

This question might be grounded in my ignorance ... if so please just point
me to the right documentation if im mising something obvious :)

thanks !
-- 
jay vyas


jsonRDD: NoSuchMethodError

2014-07-14 Thread SK
Hi,

I am using Spark 1.0.1. I am using the following piece of code to parse a
json file. It is based on the code snippet in the SparkSQL programming
guide. However, the compiler outputs an error stating: 

java.lang.NoSuchMethodError:
org.apache.spark.sql.SQLContext.jsonRDD(Lorg/apache/spark/rdd/RDD;)Lorg/apache/spark/sql/SchemaRDD;

I get a similar error for jsonFile() as well. I have included the spark-sql
1.0.1 jar when building my program using sbt. What is the right library to
import for jsonRDD and jsonFile?

thanks

import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.json

object SQLExample{
   def main(args : Array[String]) {

  val sparkConf = new SparkConf().setAppName("JsonExample")
  val sc = new SparkContext(sparkConf)
  val sqlc = new org.apache.spark.sql.SQLContext(sc)

  val jrdd = sc.textFile(args(0)).filter(r=> r.trim != "")
  val data = sqlc.jsonRDD(jrdd)

  data.printSchema()
   }
}




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/jsonRDD-NoSuchMethodError-tp9688.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: spark1.0.1 catalyst transform filter not push down

2014-07-14 Thread Victor Sheng
I use queryPlan.queryExecution.analyzed to get the logical plan.

it works.

And What you explained to me is very useful. 

Thank you very much.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark1-0-1-catalyst-transform-filter-not-push-down-tp9599p9689.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: ---cores option in spark-shell

2014-07-14 Thread cjwang
Neither do they work in new 1.0.1 either



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/cores-option-in-spark-shell-tp6809p9690.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: jsonRDD: NoSuchMethodError

2014-07-14 Thread Michael Armbrust
Have you upgraded the cluster where you are running this 1.0.1 as
well?  A NoSuchMethodError
almost always means that the class files available at runtime are different
from those that were there when you compiled your program.


On Mon, Jul 14, 2014 at 7:06 PM, SK  wrote:

> Hi,
>
> I am using Spark 1.0.1. I am using the following piece of code to parse a
> json file. It is based on the code snippet in the SparkSQL programming
> guide. However, the compiler outputs an error stating:
>
> java.lang.NoSuchMethodError:
>
> org.apache.spark.sql.SQLContext.jsonRDD(Lorg/apache/spark/rdd/RDD;)Lorg/apache/spark/sql/SchemaRDD;
>
> I get a similar error for jsonFile() as well. I have included the spark-sql
> 1.0.1 jar when building my program using sbt. What is the right library to
> import for jsonRDD and jsonFile?
>
> thanks
>
> import org.apache.spark._
> import org.apache.spark.sql._
> import org.apache.spark.sql.json
>
> object SQLExample{
>def main(args : Array[String]) {
>
>   val sparkConf = new SparkConf().setAppName("JsonExample")
>   val sc = new SparkContext(sparkConf)
>   val sqlc = new org.apache.spark.sql.SQLContext(sc)
>
>   val jrdd = sc.textFile(args(0)).filter(r=> r.trim != "")
>   val data = sqlc.jsonRDD(jrdd)
>
>   data.printSchema()
>}
> }
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/jsonRDD-NoSuchMethodError-tp9688.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: ---cores option in spark-shell

2014-07-14 Thread Andrew Or
Yes, the documentation is actually a little outdated. We will get around to
fix it shortly. Please use --driver-cores or --executor-cores instead.


2014-07-14 19:10 GMT-07:00 cjwang :

> Neither do they work in new 1.0.1 either
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/cores-option-in-spark-shell-tp6809p9690.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: import org.apache.spark.streaming.twitter._ in Shell

2014-07-14 Thread Tathagata Das
Did you make any updates in Spark version recently, after which you noticed
this problem? Because if you were using Spark 0.8 and below, then twitter
would have worked in the Spark shell. In Spark 0.9, we moved those
dependencies out of the core spark for those to update more freely without
raising dependency-related concerns into the core of spark streaming.

TD


On Mon, Jul 14, 2014 at 6:29 PM, Nicholas Chammas <
nicholas.cham...@gmail.com> wrote:

> On Mon, Jul 14, 2014 at 6:52 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> The twitter functionality is not available through the shell.
>>
>
> I've been processing Tweets live from the shell, though not for a long
> time. That's how I uncovered the problem with the Twitter receiver not
> deregistering, btw.
>
> Did I misunderstand your comment?
>
> Nick
>


Re: import org.apache.spark.streaming.twitter._ in Shell

2014-07-14 Thread Nicholas Chammas
If we're talking about the issue you captured in SPARK-2464
, then it was a newly
launched EC2 cluster on 1.0.1.


On Mon, Jul 14, 2014 at 10:48 PM, Tathagata Das  wrote:

> Did you make any updates in Spark version recently, after which you
> noticed this problem? Because if you were using Spark 0.8 and below, then
> twitter would have worked in the Spark shell. In Spark 0.9, we moved those
> dependencies out of the core spark for those to update more freely without
> raising dependency-related concerns into the core of spark streaming.
>
> TD
>
>
> On Mon, Jul 14, 2014 at 6:29 PM, Nicholas Chammas <
> nicholas.cham...@gmail.com> wrote:
>
>> On Mon, Jul 14, 2014 at 6:52 PM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> The twitter functionality is not available through the shell.
>>>
>>
>> I've been processing Tweets live from the shell, though not for a long
>> time. That's how I uncovered the problem with the Twitter receiver not
>> deregistering, btw.
>>
>> Did I misunderstand your comment?
>>
>> Nick
>>
>
>


RACK_LOCAL Tasks Failed to finish

2014-07-14 Thread 洪奇
Hi all,When running GraphX applications on Spark, task scheduler may schedule 
some tasks to be executed on RACK_LOCAL executors,but the tasks get halting in 
that case, repeating print the following log information:
14-07-14 15:59:14 INFO [Executor task launch worker-6] 
BlockFetcherIterator$BasicBlockFetcherIterator: Started 1 remote fetches in 3 ms
14-07-14 15:59:14 INFO [Executor task launch worker-1] BlockManager: Found 
block rdd_29_38 locally
14-07-14 15:59:14 INFO [Executor task launch worker-1] BlockManager: Found 
block rdd_29_38 locally
14-07-14 15:59:14 INFO [Executor task launch worker-1] BlockManager: Found 
block rdd_29_38 locally
14-07-14 15:59:14 INFO [Executor task launch worker-1] 
BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, 
targetRequestSize: 10066329
14-07-14 15:59:14 INFO [Executor task launch worker-1] 
BlockFetcherIterator$BasicBlockFetcherIterator: Getting 300 non-empty blocks 
out of 300 blocks
14-07-14 15:59:14 INFO [Executor task launch worker-1] 
BlockFetcherIterator$BasicBlockFetcherIterator: Started 1 remote fetches in 3 ms
14-07-14 15:59:14 INFO [Executor task launch worker-0] 
BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, 
targetRequestSize: 10066329
14-07-14 15:59:14 INFO [Executor task launch worker-0] 
BlockFetcherIterator$BasicBlockFetcherIterator: Getting 300 non-empty blocks 
out of 300 blocks
14-07-14 15:59:14 INFO [Executor task launch worker-0] 
BlockFetcherIterator$BasicBlockFetcherIterator: Started 1 remote fetches in 3 ms
14-07-14 15:59:14 INFO [Executor task launch worker-2] BlockManager: Found 
block rdd_29_2 locally
14-07-14 15:59:14 INFO [Executor task launch worker-2] BlockManager: Found 
block rdd_29_2 locally
14-07-14 15:59:14 INFO [Executor task launch worker-2] BlockManager: Found 
block rdd_29_2 locally
BlockManager's `get` and `getMultiple` are being called continually, and I 
don't know why. Are there some rdds being recomputed?Thanks for your help.Qiping



  1   2   >