Re: Spark / Thrift / ODBC connectivity

2014-08-29 Thread Cheng Lian
You can use the Thrift server to access Hive tables that locates in legacy
Hive warehouse and/or those generated by Spark SQL. Simba provides Spark
SQL ODBC driver that enables applications like Tableau. But right now I'm
not 100% sure about whether the driver has officially released yet.


On Thu, Aug 28, 2014 at 9:42 PM, Denny Lee  wrote:

> I’m currently using the Spark 1.1 branch and have been able to get the
> Thrift service up and running.  The quick questions were whether I should
> able to use the Thrift service to connect to SparkSQL generated tables
> and/or Hive tables?
>
> As well, by any chance do we have any documents that point to how we can
> connect something like Tableau to Spark SQL Thrift - similar to the SAP
> ODBC connectivity http://www.saphana.com/docs/DOC-472?
>
> Thanks!
> Denny
>
>


how can I get the number of cores

2014-08-29 Thread Kevin Jung
Hi all
Spark web ui gives me the information about total cores and used cores.
I want to get this information programmatically.
How can I do this?

Thanks
Kevin



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/how-can-I-get-the-number-of-cores-tp13111.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark SQL : how to find element where a field is in a given set

2014-08-29 Thread Michael Armbrust
To pass a list to a variadic function you can use the type ascription :_*

For example:

val longList = Seq[Expression]("a", "b", ...)
table("src").where('key in (longList: _*))

Also, note that I had to explicitly specify Expression as the type
parameter of Seq to ensure that the compiler converts "a" and "b" into
Spark SQL expressions.




On Thu, Aug 28, 2014 at 11:52 PM, Jaonary Rabarisoa 
wrote:

> ok, but what if I have a long list do I need to hard code like this every
> element of my list of is there a function that translate a list into a
> tuple ?
>
>
> On Fri, Aug 29, 2014 at 3:24 AM, Michael Armbrust 
> wrote:
>
>> You don't need the Seq, as in is a variadic function.
>>
>> personTable.where('name in ("foo", "bar"))
>>
>>
>>
>> On Thu, Aug 28, 2014 at 3:09 AM, Jaonary Rabarisoa 
>> wrote:
>>
>>> Hi all,
>>>
>>> What is the expression that I should use with spark sql DSL if I need to
>>> retreive
>>> data with a field in a given set.
>>> For example :
>>>
>>> I have the following schema
>>>
>>> case class Person(name: String, age: Int)
>>>
>>> And I need to do something like :
>>>
>>> personTable.where('name in Seq("foo", "bar")) ?
>>>
>>>
>>> Cheers.
>>>
>>>
>>> Jaonary
>>>
>>
>>
>


Re: u'' notation with pyspark output data

2014-08-29 Thread Davies Liu
u'14.0' means a unicode string, you can convert into str by
u'14.0'.encode('utf8'), or you can convert it into float by
float(u'14.0')

Davies

On Thu, Aug 28, 2014 at 11:22 PM, Oleg Ruchovets  wrote:
> Hi ,
>   I am working with pyspark and doing simple aggregation
>
>
> def doSplit(x):
> y = x.split(',')
> if(len(y)==3):
>return  y[0],(y[1],y[2])
>
> counts = lines.map(doSplit).groupByKey()
> output = counts.collect()
>
> Iterating over output I got such format of the data u'1385501280' , u'14.0'
> , but actually I need to work with 14 instead of u'14.0' and  1385501280
> u'1385501280'
>
> Question:
>how to get actually data without u'' notation?
>
>
> Thanks
> Oleg.
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Ensuring object in spark streaming runs on specific node

2014-08-29 Thread Filip Andrei
Say you have a spark streaming setup such as

JavaReceiverInputDStream<...> rndLists = jssc.receiverStream(new
JavaRandomReceiver(...));

rndLists.map(new NeuralNetMapper(...))
.foreach(new JavaSyncBarrier(...));

Is there any way of ensuring that, say, a JavaRandomReceiver and
JavaSyncBarrier get distributed to the same node ? Or is this even a
question that makes sense ?

Some information as to how spark-streaming distributes work across a cluster
would also be greatly appreciated.

( i've also asked this question on stackoverflow at
http://stackoverflow.com/questions/25564356/ensuring-object-in-spark-streaming-runs-on-specific-node
)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Ensuring-object-in-spark-streaming-runs-on-specific-node-tp13114.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Running Spark On Yarn without Spark-Submit

2014-08-29 Thread Archit Thakur
Hi,

My requirement is to run Spark on Yarn without using the script
spark-submit.

I have a servlet and a tomcat server. As and when request comes, it creates
a new SC and keeps it alive for the further requests, I ma setting my
master in sparkConf

as sparkConf.setMaster("yarn-cluster")

but the request is stuck indefinitely.

This works when I set
sparkConf.setMaster("yarn-client")

I am not sure, why is it not launching job in yarn-cluster mode.

Any thoughts?

Thanks and Regards,
Archit Thakur.


Re: how to specify columns in groupby

2014-08-29 Thread MEETHU MATHEW
Thank you Yanbo for the reply..

I 've another query related to  cogroup.I want to iterate over the results of 
cogroup operation.

My code is 
* grp = RDD1.cogroup(RDD2)
* map((lambda (x,y): (x,list(y[0]),list(y[1]))), list(grp))
My result looks like :

[((u'764', u'20140826'), [0.70146274566650391], [ ]),
 ((u'863', u'20140826'), [0.368011474609375], [ ]),
 ((u'9571520', u'20140826'), [0.0046129226684570312], [0.60009])]
 
When I do one more cogroup operation like 

grp1 = grp.cogroup(RDD3)

I am not able to see the results.All my RDDs are of the form ((x,y),z).Can 
somebody help me to solve this.

Thanks & Regards, 
Meethu M


On Thursday, 28 August 2014 5:59 PM, Yanbo Liang  wrote:
 


For your reference:

val d1 = textFile.map(line => {
  val fileds = line.split(",")
  ((fileds(0),fileds(1)), fileds(2).toDouble)
})

val d2 = d1.reduceByKey(_+_)
d2.foreach(println)




2014-08-28 20:04 GMT+08:00 MEETHU MATHEW :

Hi all,
>
>
>I have an RDD  which has values in the  format "id,date,cost".
>
>
>I want to group the elements based on the id and date columns and get the sum 
>of the cost  for each group.
>
>
>Can somebody tell me how to do this?
>
>
> 
>Thanks & Regards, 
>Meethu M

Re: Spark SQL : how to find element where a field is in a given set

2014-08-29 Thread Jaonary Rabarisoa
Still not working for me. I got a compilation error : *value in is not a
member of Symbol.* Any ideas ?


On Fri, Aug 29, 2014 at 9:46 AM, Michael Armbrust 
wrote:

> To pass a list to a variadic function you can use the type ascription :_*
>
> For example:
>
> val longList = Seq[Expression]("a", "b", ...)
> table("src").where('key in (longList: _*))
>
> Also, note that I had to explicitly specify Expression as the type
> parameter of Seq to ensure that the compiler converts "a" and "b" into
> Spark SQL expressions.
>
>
>
>
> On Thu, Aug 28, 2014 at 11:52 PM, Jaonary Rabarisoa 
> wrote:
>
>> ok, but what if I have a long list do I need to hard code like this every
>> element of my list of is there a function that translate a list into a
>> tuple ?
>>
>>
>> On Fri, Aug 29, 2014 at 3:24 AM, Michael Armbrust > > wrote:
>>
>>> You don't need the Seq, as in is a variadic function.
>>>
>>> personTable.where('name in ("foo", "bar"))
>>>
>>>
>>>
>>> On Thu, Aug 28, 2014 at 3:09 AM, Jaonary Rabarisoa 
>>> wrote:
>>>
 Hi all,

 What is the expression that I should use with spark sql DSL if I need
 to retreive
 data with a field in a given set.
 For example :

 I have the following schema

 case class Person(name: String, age: Int)

 And I need to do something like :

 personTable.where('name in Seq("foo", "bar")) ?


 Cheers.


 Jaonary

>>>
>>>
>>
>


Re: Running Spark On Yarn without Spark-Submit

2014-08-29 Thread Archit Thakur
including user@spark.apache.org.


On Fri, Aug 29, 2014 at 2:03 PM, Archit Thakur 
wrote:

> Hi,
>
> My requirement is to run Spark on Yarn without using the script
> spark-submit.
>
> I have a servlet and a tomcat server. As and when request comes, it
> creates a new SC and keeps it alive for the further requests, I ma setting
> my master in sparkConf
>
> as sparkConf.setMaster("yarn-cluster")
>
> but the request is stuck indefinitely.
>
> This works when I set
> sparkConf.setMaster("yarn-client")
>
> I am not sure, why is it not launching job in yarn-cluster mode.
>
> Any thoughts?
>
> Thanks and Regards,
> Archit Thakur.
>
>
>
>


Re: How to debug this error?

2014-08-29 Thread Yanbo Liang
It's not allowed to use RDD in map function.
RDD can only operated at driver of spark program.
At your case, group RDD can't be found at every executor.

I guess you want to implement subquery like operation, try to use
RDD.intersection() or join()


2014-08-29 12:43 GMT+08:00 Gary Zhao :

> Hello
>
> I'm new to Spark and playing around, but saw the following error. Could
> anyone to help on it?
>
> Thanks
> Gary
>
>
>
> scala> c
> res15: org.apache.spark.rdd.RDD[String] = FlatMappedRDD[7] at flatMap at
> :23
>
> scala> group
> res16: org.apache.spark.rdd.RDD[(String, Iterable[String])] =
> MappedValuesRDD[5] at groupByKey at :19
>
> val d = c.map(i=>group.filter(_._1 ==i ))
>
> d.first
>
> 14/08/29 04:39:33 INFO TaskSchedulerImpl: Cancelling stage 28
> 14/08/29 04:39:33 INFO DAGScheduler: Failed to run first at :28
> org.apache.spark.SparkException: Job aborted due to stage failure: Task
> 28.0:180 failed 4 times, most recent failure: Exception failure in TID 3605
> on host mcs-spark-slave1-staging.snc1: java.lang.NullPointerException
> org.apache.spark.rdd.RDD.filter(RDD.scala:282)
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:25)
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:25)
> scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
> scala.collection.Iterator$class.foreach(Iterator.scala:727)
> scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
> scala.collection.TraversableOnce$class.to
> (TraversableOnce.scala:273)
> scala.collection.AbstractIterator.to(Iterator.scala:1157)
>
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
> scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
> scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
> org.apache.spark.rdd.RDD$$anonfun$28.apply(RDD.scala:1003)
> org.apache.spark.rdd.RDD$$anonfun$28.apply(RDD.scala:1003)
>
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083)
>
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083)
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
> org.apache.spark.scheduler.Task.run(Task.scala:51)
>
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> java.lang.Thread.run(Thread.java:744)
> Driver stacktrace:
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049)
>  at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031)
>  at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>  at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635)
>  at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635)
> at scala.Option.foreach(Option.scala:236)
>  at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:635)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1234)
>  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>  at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>  at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>  at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>


RE: The concurrent model of spark job/stage/task

2014-08-29 Thread linkpatrickliu
Hi, 
I think an example will help illustrate the model better.
/*** SimpleApp.scala ***/import org.apache.spark.SparkContextimport 
org.apache.spark.SparkContext._
object SimpleApp {  def main(args: Array[String]) {val logFile = 
"$YOUR_SPARK_HOME/README.md" val sc = new SparkContext("local", "Simple 
App", "YOUR_SPARK_HOME", List("target/scala-2.10/simple-project_2.10-1.0.jar")) 
   val logData = sc.textFile(logFile, 2).cache()val numAs = 
logData.filter(line => line.contains("a")).filter(line => 
line.contains("c")).count()val numBs = logData.filter(line => 
line.contains("b")).count()println("Lines with a: %s, Lines with b: 
%s".format(numAs, numBs))  }}
The example's DAG graph is corresponding to your graph:Let's see how it 
works:1. val sc = new SparkContext   // This line create the 
SparkContext(which is the driver)
2. val numAs = logData.filter(line => line.contains("a")).filter(line => 
line.contains("c")).count()This is a job with 2 transformation and 1 action.
3. val numBs = logData.filter(line => line.contains("b")).count()This is 
another job with 1 transformation and 1 action.
Remember the Scala's LAZY calculation strategy.
The job numAs will be calculated by invoking the count() method.It has 3 
stages. FilteredRDD(1) <- FilteredRDD(2) <- RDD.count()
(1) RDD.count() will submit it as the Final Stage to DAGScheduler. (2) 
DAGScheduler analyse the dependency chain, and asks RDD's parent FilteredRDD(2) 
to be computed first, and FilteredRDD(2) will ask its parent FilteredRDD(1) to 
computed first. FilteredRDD(1) is the first, so it will be computed.(3) Then 
DAGScheduler wrap the FilteredRDD(1) as a TaskSet, and submit the TaskSet to 
TaskSchedulerImple.(4) Then TaskSchedulerImple will schedule the TaskSet by 
"FIFO" or "FAIR" strategy.(5) The tasks in TaskSet will be distributed to 
different Executor. (6) After all the tasks of this TaskSet have finished. This 
Stage is marked finished. (RDD will be cached by BlockStore, RDD data can be 
shared in this SparkContext. If you have a job numCs,val numCs= 
logData.filter(line => line.contains("a")).filter(line => 
line.contains("d")).count() the first filter(line => line.contains("a")) can 
reuse the RDD data computed in numAs.)
(7) Then the FilteredRDD(2) will be computed. Then the RDD.count().(8) Finally 
you have the result for numAs.

I think you now understand the submit&schedule&run process.Let's see the 
questions:
1. Each DAGgraph is related with 1 action. You can write multiple actions in a 
spark application. If you want these actions to run simultaneously, you have to 
to submit these actions in different threads.2. I think you should pay 
attention to "FIFO" or "FAIR" scheduler strategy. If the first action is too 
large, maybe the second action will be starved.3. I think the question is how 
to persist the RDD data to local disk?You could use saveAsTextFile(path) or 
saveAsSequenceFile(path) to persist RDD data to local dist.
Hope this will help you.
Best regards,Patrick Liu

Date: Thu, 28 Aug 2014 23:34:29 -0700
From: ml-node+s1001560n13104...@n3.nabble.com
To: linkpatrick...@live.com
Subject: Re: RE: The concurrent model of spark job/stage/task




hi, dear
  Thanks for the response. Some comments below. and yes, I am using spark on 
yarn.1. The release doc of spark says multi jobs can be submitted in one 
application if the jobs(actions) are submit by different threads. I wrote some 
java thread code in driver, one action in each thread, and the stages are run 
concurrently which is observed on stages UI. In my understanding the 
DAGscheduler generates different graph for each action. Not sure correct or 
not.Originally I was hoping the sparkcontext can generate different jobs for 
none-relevant actions, but never try it successfully.

2. If DAGscheduler generates graph as below, can 1 and 2 run concurrently?
3.  I want to reterive the original data out of RDD and have other computation 
on the data. Like get the value of tempreture or other data, and works on them.


[hidden email]
 From: [hidden email]Date: 2014-08-29 14:01To: [hidden email]Subject: RE: The 
concurrent model of spark job/stage/task
Hi, 
Please see the answers following each question. If there's any mistake, please 
let me know. Thanks!
I am not sure which mode you are running. So I will assume you are using 
spark-submit script to submit spark applications to spark 
cluster(spark-standalone or Yarn)
1. how to start 2 or more jobs in one spark driver, in java code.. I wrote 2 
actions in the code, but the job still staged in index 0, 1, 2, 3... looks they 
run secquencly.A spark application is a job, you init the application by create 
a SparkContext. The SparkContext will init the driver program for you.So if you 
want to run multiple jobs simultaneously,  you have to split the jobs into 
different applications, and submit each of them.
The driver program is like an ApplicationMaster in yarn. It translate the spark 
application int

Re: Spark Hive max key length is 767 bytes

2014-08-29 Thread arthur.hk.c...@gmail.com
Hi,


Tried the same thing in HIVE directly without issue:

HIVE:
hive> create table test_datatype2 (testbigint bigint );
OK
Time taken: 0.708 seconds

hive> drop table test_datatype2;
OK
Time taken: 23.272 seconds



Then tried again in SPARK:
scala> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
14/08/29 19:33:52 INFO Configuration.deprecation: 
mapred.reduce.tasks.speculative.execution is deprecated. Instead, use 
mapreduce.reduce.speculative
hiveContext: org.apache.spark.sql.hive.HiveContext = 
org.apache.spark.sql.hive.HiveContext@395c7b94

scala> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
res0: org.apache.spark.sql.SchemaRDD = 
SchemaRDD[0] at RDD at SchemaRDD.scala:104
== Query Plan ==


scala> hiveContext.hql("drop table test_datatype3")

14/08/29 19:34:14 ERROR DataNucleus.Datastore: An exception was thrown while 
adding/validating class(es) : Specified key was too long; max key length is 767 
bytes
com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Specified key was 
too long; max key length is 767 bytes
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39)

14/08/29 19:34:17 WARN DataNucleus.Query: Query for candidates of 
org.apache.hadoop.hive.metastore.model.MPartition and subclasses resulted in no 
possible candidates
Error(s) were found while auto-creating/validating the datastore for classes. 
The errors are printed in the log, and are attached to this exception.
org.datanucleus.exceptions.NucleusDataStoreException: Error(s) were found while 
auto-creating/validating the datastore for classes. The errors are printed in 
the log, and are attached to this exception.
at 
org.datanucleus.store.rdbms.RDBMSStoreManager$ClassAdder.verifyErrors(RDBMSStoreManager.java:3609)


Caused by: com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Specified 
key was too long; max key length is 767 bytes
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39)

14/08/29 19:34:17 INFO DataNucleus.Datastore: The class 
"org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as 
"embedded-only" so does not have its own datastore table.
14/08/29 19:34:17 INFO DataNucleus.Datastore: The class 
"org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so 
does not have its own datastore table.
14/08/29 19:34:17 INFO DataNucleus.Datastore: The class 
"org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as 
"embedded-only" so does not have its own datastore table.
14/08/29 19:34:17 INFO DataNucleus.Datastore: The class 
"org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so 
does not have its own datastore table.
14/08/29 19:34:17 INFO DataNucleus.Datastore: The class 
"org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as 
"embedded-only" so does not have its own datastore table.
14/08/29 19:34:17 INFO DataNucleus.Datastore: The class 
"org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so 
does not have its own datastore table.
14/08/29 19:34:25 ERROR DataNucleus.Datastore: An exception was thrown while 
adding/validating class(es) : Specified key was too long; max key length is 767 
bytes
com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Specified key was 
too long; max key length is 767 bytes
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)


Can anyone please help?

Regards
Arthur


On 29 Aug, 2014, at 12:47 pm, arthur.hk.c...@gmail.com 
 wrote:

> (Please ignore if duplicated) 
> 
> 
> Hi,
> 
> I use Spark 1.0.2 with Hive 0.13.1
> 
> I have already set the hive mysql database to latine1; 
> 
> mysql:
> alter database hive character set latin1;
> 
> Spark:
> scala> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> scala> hiveContext.hql("create table test_datatype1 (testbigint bigint )")
> scala> hiveContext.hql("drop table test_datatype1")
> 
> 
> 14/08/29 12:31:55 INFO DataNucleus.Datastore: The class 
> "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as 
> "embedded-only" so does not have its own datastore table.
> 14/08/29 12:31:55 INFO DataNucleus.Datastore: The class 
> "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" 
> so does not have its own datastore table.
> 14/08/29 12:31:55 INFO DataNucleus.Datastore: The class 
> "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as 
> "embedded-only" so does not have its own datastore table.
> 14/08/29 12:31:55 INFO DataNucleus.Datastore: The class 
> "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" 
> so does not have its own datastore table.
> 14/08/29 12:31:59 ERROR DataNucleus.Datastore: An exception was t

Re: how to filter value in spark

2014-08-29 Thread marylucy
i see it works well,thank you!!!

But in follow situation how to do

var a = sc.textFile("/sparktest/1/").map((_,"a"))
var b = sc.textFile("/sparktest/2/").map((_,"b"))
How to get (3,"a") and (4,"a")


在 Aug 28, 2014,19:54,"Matthew Farrellee"  写道:

> On 08/28/2014 07:20 AM, marylucy wrote:
>> fileA=1 2 3 4  one number a line,save in /sparktest/1/
>> fileB=3 4 5 6  one number a line,save in /sparktest/2/
>> I want to get 3 and 4
>> 
>> var a = sc.textFile("/sparktest/1/").map((_,1))
>> var b = sc.textFile("/sparktest/2/").map((_,1))
>> 
>> a.filter(param=>{b.lookup(param._1).length>0}).map(_._1).foreach(println)
>> 
>> Error throw
>> Scala.MatchError:Null
>> PairRDDFunctions.lookup...
> 
> the issue is nesting of the b rdd inside a transformation of the a rdd
> 
> consider using intersection, it's more idiomatic
> 
> a.intersection(b).foreach(println)
> 
> but not that intersection will remove duplicates
> 
> best,
> 
> 
> matt
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 


Re: Running Spark On Yarn without Spark-Submit

2014-08-29 Thread Chester @work
Archit
 We are using yarn-cluster mode , and calling spark via Client class 
directly from servlet server. It works fine. 
To establish a communication channel to give further requests, 
 It should be possible with yarn client, but not with yarn server. Yarn 
client mode, spark driver is outside the yarn cluster; so it can issue more 
commands. In yarn cluster, all programs including spark driver is running 
inside the yarn cluster. There is no communication channel with the client 
until the job finishes.

If you job is to keep spark context alive, and wait for other commands, then 
this should wait forever. 

I am actually working on some improvements on this and experiment in our 
product, I will create PRs when I feel conformable with the solution

1) change Client API to allow the caller to know yarn app resource capacity 
before passing arguments
2) add YarnApplicationListener to the Client 
3) provide communication channel between application and spark Yarn client in 
cluster. 

The #1) is not directly related to the communication discussed here

#2) allows the application to have application life cycle call back as to app 
start end in progress failure etc with yarn resources allocations 

I changed #1 and #2 in forked spark, and it's worked well in cdh5, and I am 
testing against 2.0.5-alpha as well. 

For #3) I did not change in spark currently, as I am not sure the best approach 
yet. I put the change in the application runner which launch the spark yarn 
client in the cluster. 

The runner in yarn cluster get applications host and port information  from the 
passed configuration (args), then creates an Akka actor using spark context 
actor system, send a hand shake message to the caller outside the cluster, 
after that you will have a two way communications 

With this approach, I can send spark listener call backs to the app, error 
messages, app level messages etc. 

The runner inside the cluster can also receive requests from outside cluster 
such as stop. 

We are not sure Akka approach is the best, so I am still experimenting it. So 
far it does what we wants .

Hope this helps

Chester


Sent from my iPhone

> On Aug 29, 2014, at 2:36 AM, Archit Thakur  wrote:
> 
> including user@spark.apache.org.
> 
> 
>> On Fri, Aug 29, 2014 at 2:03 PM, Archit Thakur  
>> wrote:
>> Hi,
>> 
>> My requirement is to run Spark on Yarn without using the script spark-submit.
>> 
>> I have a servlet and a tomcat server. As and when request comes, it creates 
>> a new SC and keeps it alive for the further requests, I ma setting my master 
>> in sparkConf
>> 
>> as sparkConf.setMaster("yarn-cluster")
>> 
>> but the request is stuck indefinitely. 
>> 
>> This works when I set
>> sparkConf.setMaster("yarn-client")
>> 
>> I am not sure, why is it not launching job in yarn-cluster mode.
>> 
>> Any thoughts?
>> 
>> Thanks and Regards,
>> Archit Thakur. 
> 


Re: Where to save intermediate results?

2014-08-29 Thread huylv
Hi Daniel,

Your suggestion is definitely an interesting approach. In fact, I already
have another system to deal with the stream analytical processing part. So
basically, the Spark job to aggregate data just accumulatively computes
aggregations from historical data together with new batch, which has been
partly summarized by the stream processor. Answering queries involves in
combining pre-calculated historical data together with on-stream
aggregations. This sounds much like what Spark Streaming is intended to do.
So I'll take a look deeper into Spark Streaming to consider porting the
stream processing part to use Spark Streaming.

Regarding saving pre-calculated data onto external storages (disk,
database...), I'm looking at Cassandra for now. But I don't know how it fits
into my context and how is its performance compared to saving to files in
HDFS. Also, is there anyway to keep the precalculated data both on disk and
on memory, so that when the batch job terminated, historical data still
available on memory for combining with stream processor, while still be able
to survive system failure or upgrade? Not to mention the size of
precalculated data might get too big, in that case, partly store newest data
on memory only would be better. Tachyon looks like a nice option but again,
I don't have experience with it and it's still an experimental feature of
Spark.

Regards,
Huy



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Where-to-save-intermediate-results-tp13062p13127.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: how can I get the number of cores

2014-08-29 Thread Nicholas Chammas
What version of Spark are you running?

Try calling sc.defaultParallelism. I’ve found that it is typically set to
the number of worker cores in your cluster.
​


On Fri, Aug 29, 2014 at 3:39 AM, Kevin Jung  wrote:

> Hi all
> Spark web ui gives me the information about total cores and used cores.
> I want to get this information programmatically.
> How can I do this?
>
> Thanks
> Kevin
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/how-can-I-get-the-number-of-cores-tp13111.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark Streaming checkpoint recovery causes IO re-execution

2014-08-29 Thread Yana Kadiyska
I understand that the DB writes are happening from the workers unless you
collect. My confusion is that you believe workers recompute on recovery("nodes
computations which get redone upon recovery"). My understanding is that
checkpointing dumps  the RDD to disk and the cuts the RDD lineage. So I
thought on driver restart you'll get a set of new executor processes but
they would read the last known state of the RDD from HDFS checkpoint. Am I
off here?

So the only situation I can imagine where you end up recomputing is if your
checkpointing at a larger interval than your batch size (i.e. the RDD on
disk does not reflect it's last precrash state)?


On Thu, Aug 28, 2014 at 1:32 PM, RodrigoB 
wrote:

> Hi Yana,
>
> The fact is that the DB writing is happening on the node level and not on
> Spark level. One of the benefits of distributed computing nature of Spark
> is
> enabling IO distribution as well. For example, is much faster to have the
> nodes to write to Cassandra instead of having them all collected at the
> driver level and sending the writes from there.
>
> The problem is that nodes computations which get redone upon recovery. If
> these lambda functions send events to other systems these events would get
> resent upon re-computation causing overall system instability.
>
> Hope this helps you understand the problematic.
>
> tnks,
> Rod
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-checkpoint-recovery-causes-IO-re-execution-tp12568p13043.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark webUI - application details page

2014-08-29 Thread Brad Miller
How did you specify the HDFS path?  When i put

spark.eventLog.dir   hdfs://
crosby.research.intel-research.net:54310/tmp/spark-events

in my spark-defaults.conf file, I receive the following error:

An error occurred while calling
None.org.apache.spark.api.java.JavaSparkContext.
: java.io.IOException: Call to
crosby.research.intel-research.net/10.212.84.53:54310 failed on local
exception: java.io.EOFException

-Brad


On Thu, Aug 28, 2014 at 12:26 PM, SK  wrote:

> I was able to recently solve this problem for standalone mode. For this
> mode,
> I did not use a history server. Instead, I set spark.eventLog.dir (in
> conf/spark-defaults.conf) to a directory in hdfs (basically this directory
> should be in a place that is writable by the master and accessible globally
> to all the nodes).
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-webUI-application-details-page-tp3490p13055.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Spark Streaming reset state

2014-08-29 Thread Eko Susilo
Hi all,

I would like to ask some advice about resetting spark stateful operation.
so i tried like this:

JavaStreamingContext jssc = new JavaStreamingContext(context, new
Duration(5000));
jssc.remember(Duration(5*60*1000));
jssc.checkpoint(ApplicationConstants.HDFS_STREAM_DIRECTORIES);
JavaPairReceiverInputDStream messages =
(JavaPairReceiverInputDStream)
KafkaUtils.createStream(jssc, "localhost:2181", "test-consumer-group",
topicMap);
JavaPairDStream windowed= messages.window(WINDOW_LENGTH,
SLIDE_INTERVAL);
JavaDStream lines = windowed.map(new Function, LogEntry>() { @Override public LogEntry call(Tuple2 tuple2) { LogEntry _Result=Utils.parseLine(tuple2._2()); return
_Result; } }).filter(Functions.FILTER_LOG_ENTRY).cache();

JavaPairDStream codes=lines.mapToPair(Functions.GET_CODE).
reduceByKey(Functions.SUM_REDUCER).
updateStateByKey(COMPUTE_RUNNING_SUM);
i thought by setting the remember to 5 minutes, the "codes" RDD that
derived from messages would also be reseted in 5 minutes, but in fact no.

Is there any way to reset the "codes" RDD after a period of time (5
minutes)?

Thanks



-- 
Best Regards,
Eko Susilo


Re: Low Level Kafka Consumer for Spark

2014-08-29 Thread bharatvenkat
Chris,

I did the Dstream.repartition mentioned in the document on parallelism in
receiving, as well as set "spark.default.parallelism" and it still uses only
2 nodes in my cluster.  I notice there is another email thread on the same
topic:

http://apache-spark-user-list.1001560.n3.nabble.com/DStream-repartitioning-performance-tuning-processing-td13069.html

My code is in Java and here is what I have:

   JavaPairReceiverInputDStream messages =

KafkaUtils.createStream(ssc, zkQuorum,
"cse-job-play-consumer", kafkaTopicMap);

JavaPairDStream newMessages =
messages.repartition(partitionSize);// partitionSize=30

JavaDStream lines = newMessages.map(new
Function, String>() {
...

public String call(Tuple2 tuple2) {
  return tuple2._2();
}
  });

JavaDStream words = lines.flatMap(new
MetricsComputeFunction()
);

JavaPairDStream wordCounts = words.mapToPair(
new PairFunction() {
   ...
}
);

 wordCounts.foreachRDD(new Function,
Void>() {...});

Thanks,
Bharat



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p13131.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



/tmp/spark-events permissions problem

2014-08-29 Thread Brad Miller
Hi All,

Yesterday I restarted my cluster, which had the effect of clearing /tmp.
 When I brought Spark back up and ran my first job, /tmp/spark-events was
re-created and the job ran fine.  I later learned that other users were
receiving errors when trying to create a spark context.  It turned out the
reason was that only my user was able to create subdirectories within
/tmp/spark-events.

I believe /tmp/spark-events originally had ownership "bmiller1:bmiller1"
(where "bmiller1" is my username) with permissions 770.  Once I modified
the permission to allow other users to create subdirectories other users
were again able to launch jobs.

Note that I think this may be related to some problems I am having viewing
application history (see link).
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-webUI-application-details-page-td3490.html#a13130

Has anybody else experienced a problem with permissions on the
"spark.eventLog.dir" directory?

best,
-Brad


Re: Spark SQL : how to find element where a field is in a given set

2014-08-29 Thread Michael Armbrust
What version are you using?


On Fri, Aug 29, 2014 at 2:22 AM, Jaonary Rabarisoa 
wrote:

> Still not working for me. I got a compilation error : *value in is not a
> member of Symbol.* Any ideas ?
>
>
> On Fri, Aug 29, 2014 at 9:46 AM, Michael Armbrust 
> wrote:
>
>> To pass a list to a variadic function you can use the type ascription :_*
>>
>> For example:
>>
>> val longList = Seq[Expression]("a", "b", ...)
>> table("src").where('key in (longList: _*))
>>
>> Also, note that I had to explicitly specify Expression as the type
>> parameter of Seq to ensure that the compiler converts "a" and "b" into
>> Spark SQL expressions.
>>
>>
>>
>>
>> On Thu, Aug 28, 2014 at 11:52 PM, Jaonary Rabarisoa 
>> wrote:
>>
>>> ok, but what if I have a long list do I need to hard code like this
>>> every element of my list of is there a function that translate a list into
>>> a tuple ?
>>>
>>>
>>> On Fri, Aug 29, 2014 at 3:24 AM, Michael Armbrust <
>>> mich...@databricks.com> wrote:
>>>
 You don't need the Seq, as in is a variadic function.

 personTable.where('name in ("foo", "bar"))



 On Thu, Aug 28, 2014 at 3:09 AM, Jaonary Rabarisoa 
 wrote:

> Hi all,
>
> What is the expression that I should use with spark sql DSL if I need
> to retreive
> data with a field in a given set.
> For example :
>
> I have the following schema
>
> case class Person(name: String, age: Int)
>
> And I need to do something like :
>
> personTable.where('name in Seq("foo", "bar")) ?
>
>
> Cheers.
>
>
> Jaonary
>


>>>
>>
>


Re: Spark Streaming reset state

2014-08-29 Thread Sean Owen
"codes" is a DStream, not an RDD. The remember() method controls how
long Spark Streaming holds on to the RDDs itself. Clarify what you
mean by "reset"? codes provides a stream of RDDs that contain your
computation over a window of time. New RDDs come with the computation
over new data.

On Fri, Aug 29, 2014 at 4:30 PM, Eko Susilo
 wrote:
> Hi all,
>
> I would like to ask some advice about resetting spark stateful operation.
> so i tried like this:
>
> JavaStreamingContext jssc = new JavaStreamingContext(context, new
> Duration(5000));
> jssc.remember(Duration(5*60*1000));
> jssc.checkpoint(ApplicationConstants.HDFS_STREAM_DIRECTORIES);
> JavaPairReceiverInputDStream messages =
>(JavaPairReceiverInputDStream)
> KafkaUtils.createStream(jssc, "localhost:2181", "test-consumer-group",
> topicMap);
> JavaPairDStream windowed= messages.window(WINDOW_LENGTH,
> SLIDE_INTERVAL);
> JavaDStream lines = windowed.map(new Function String>, LogEntry>() { @Override public LogEntry call(Tuple2
> tuple2) { LogEntry _Result=Utils.parseLine(tuple2._2()); return _Result; }
> }).filter(Functions.FILTER_LOG_ENTRY).cache();
>
> JavaPairDStream codes=lines.mapToPair(Functions.GET_CODE).
> reduceByKey(Functions.SUM_REDUCER).
> updateStateByKey(COMPUTE_RUNNING_SUM);
> i thought by setting the remember to 5 minutes, the "codes" RDD that derived
> from messages would also be reseted in 5 minutes, but in fact no.
>
> Is there any way to reset the "codes" RDD after a period of time (5
> minutes)?
>
> Thanks
>
>
>
> --
> Best Regards,
> Eko Susilo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Low Level Kafka Consumer for Spark

2014-08-29 Thread Jonathan Hodges
'this 2-node replication is mainly for failover in case the receiver dies
while data is in flight.  there's still chance for data loss as there's no
write ahead log on the hot path, but this is being addressed.'

Can you comment a little on how this will be addressed, will there be a
durable WAL?  Is there a JIRA for tracking this effort?

I am curious without WAL if you can avoid this data loss with explicit
management of Kafka offsets e.g. don't commit offset unless data is
replicated to multiple nodes or maybe not until processed.  The incoming
data will always be durably stored to disk in Kafka so can be replayed in
failure scenarios to avoid data loss if the offsets are managed properly.




On Thu, Aug 28, 2014 at 12:02 PM, Chris Fregly  wrote:

> @bharat-
>
> overall, i've noticed a lot of confusion about how Spark Streaming scales
> - as well as how it handles failover and checkpointing, but we can discuss
> that separately.
>
> there's actually 2 dimensions to scaling here:  receiving and processing.
>
> *Receiving*
> receiving can be scaled out by submitting new DStreams/Receivers to the
> cluster as i've done in the Kinesis example.  in fact, i purposely chose to
> submit multiple receivers in my Kinesis example because i feel it should be
> the norm and not the exception - particularly for partitioned and
> checkpoint-capable streaming systems like Kafka and Kinesis.   it's the
> only way to scale.
>
> a side note here is that each receiver running in the cluster will
> immediately replicates to 1 other node for fault-tolerance of that specific
> receiver.  this is where the confusion lies.  this 2-node replication is
> mainly for failover in case the receiver dies while data is in flight.
>  there's still chance for data loss as there's no write ahead log on the
> hot path, but this is being addressed.
>
> this in mentioned in the docs here:
> https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving
>
> *Processing*
> once data is received, tasks are scheduled across the Spark cluster just
> like any other non-streaming task where you can specify the number of
> partitions for reduces, etc.  this is the part of scaling that is sometimes
> overlooked - probably because it "works just like regular Spark", but it is
> worth highlighting.
>
> Here's a blurb in the docs:
> https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-processing
>
> the other thing that's confusing with Spark Streaming is that in Scala,
> you need to explicitly
>
> import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
>
> in order to pick up the implicits that allow DStream.reduceByKey and such
> (versus DStream.transform(rddBatch => rddBatch.reduceByKey())
>
> in other words, DStreams appear to be relatively featureless until you
> discover this implicit.  otherwise, you need to operate on the underlying
> RDD's explicitly which is not ideal.
>
> the Kinesis example referenced earlier in the thread uses the DStream
> implicits.
>
>
> side note to all of this - i've recently convinced my publisher for my
> upcoming book, Spark In Action, to let me jump ahead and write the Spark
> Streaming chapter ahead of other more well-understood libraries.  early
> release is in a month or so.  sign up  @ http://sparkinaction.com if you
> wanna get notified.
>
> shameless plug that i wouldn't otherwise do, but i really think it will
> help clear a lot of confusion in this area as i hear these questions asked
> a lot in my talks and such.  and i think a clear, crisp story on scaling
> and fault-tolerance will help Spark Streaming's adoption.
>
> hope that helps!
>
> -chris
>
>
>
>
> On Wed, Aug 27, 2014 at 6:32 PM, Dibyendu Bhattacharya <
> dibyendu.bhattach...@gmail.com> wrote:
>
>> I agree. This issue should be fixed in Spark rather rely on replay of
>> Kafka messages.
>>
>> Dib
>> On Aug 28, 2014 6:45 AM, "RodrigoB"  wrote:
>>
>>> Dibyendu,
>>>
>>> Tnks for getting back.
>>>
>>> I believe you are absolutely right. We were under the assumption that the
>>> raw data was being computed again and that's not happening after further
>>> tests. This applies to Kafka as well.
>>>
>>> The issue is of major priority fortunately.
>>>
>>> Regarding your suggestion, I would maybe prefer to have the problem
>>> resolved
>>> within Spark's internals since once the data is replicated we should be
>>> able
>>> to access it once more and not having to pool it back again from Kafka or
>>> any other stream that is being affected by this issue. If for example
>>> there
>>> is a big amount of batches to be recomputed I would rather have them done
>>> distributed than overloading the batch interval with huge amount of Kafka
>>> messages.
>>>
>>> I do not have yet enough know how on where is the issue and about the
>>> internal Spark code so I can't really how much difficult will be the
>>> implementation.
>>>
>>> tnks,
>>> Rod
>>>
>>>
>>>
>>>

Re: Spark Streaming reset state

2014-08-29 Thread Eko Susilo
so the "codes" currently holding RDD containing codes and its respective
counter. I would like to find a way to reset those RDD after some period of
time.


On Fri, Aug 29, 2014 at 5:55 PM, Sean Owen  wrote:

> "codes" is a DStream, not an RDD. The remember() method controls how
> long Spark Streaming holds on to the RDDs itself. Clarify what you
> mean by "reset"? codes provides a stream of RDDs that contain your
> computation over a window of time. New RDDs come with the computation
> over new data.
>
> On Fri, Aug 29, 2014 at 4:30 PM, Eko Susilo
>  wrote:
> > Hi all,
> >
> > I would like to ask some advice about resetting spark stateful operation.
> > so i tried like this:
> >
> > JavaStreamingContext jssc = new JavaStreamingContext(context, new
> > Duration(5000));
> > jssc.remember(Duration(5*60*1000));
> > jssc.checkpoint(ApplicationConstants.HDFS_STREAM_DIRECTORIES);
> > JavaPairReceiverInputDStream messages =
> >(JavaPairReceiverInputDStream)
> > KafkaUtils.createStream(jssc, "localhost:2181", "test-consumer-group",
> > topicMap);
> > JavaPairDStream windowed= messages.window(WINDOW_LENGTH,
> > SLIDE_INTERVAL);
> > JavaDStream lines = windowed.map(new Function > String>, LogEntry>() { @Override public LogEntry call(Tuple2 String>
> > tuple2) { LogEntry _Result=Utils.parseLine(tuple2._2()); return _Result;
> }
> > }).filter(Functions.FILTER_LOG_ENTRY).cache();
> >
> > JavaPairDStream codes=lines.mapToPair(Functions.GET_CODE).
> > reduceByKey(Functions.SUM_REDUCER).
> > updateStateByKey(COMPUTE_RUNNING_SUM);
> > i thought by setting the remember to 5 minutes, the "codes" RDD that
> derived
> > from messages would also be reseted in 5 minutes, but in fact no.
> >
> > Is there any way to reset the "codes" RDD after a period of time (5
> > minutes)?
> >
> > Thanks
> >
> >
> >
> > --
> > Best Regards,
> > Eko Susilo
>



-- 
Best Regards,
Eko Susilo


Re: Spark webUI - application details page

2014-08-29 Thread Sudha Krishna
I specified as follows:

spark.eventLog.dir /mapr/spark_io

We use mapr fs for sharing files. I did not provide an ip address or port
number - just the directory name on the shared filesystem.
On Aug 29, 2014 8:28 AM, "Brad Miller"  wrote:

> How did you specify the HDFS path?  When i put
>
> spark.eventLog.dir   hdfs://
> crosby.research.intel-research.net:54310/tmp/spark-events
>
> in my spark-defaults.conf file, I receive the following error:
>
> An error occurred while calling
> None.org.apache.spark.api.java.JavaSparkContext.
> : java.io.IOException: Call to
> crosby.research.intel-research.net/10.212.84.53:54310 failed on local
> exception: java.io.EOFException
>
> -Brad
>
>
> On Thu, Aug 28, 2014 at 12:26 PM, SK  wrote:
>
>> I was able to recently solve this problem for standalone mode. For this
>> mode,
>> I did not use a history server. Instead, I set spark.eventLog.dir (in
>> conf/spark-defaults.conf) to a directory in hdfs (basically this directory
>> should be in a place that is writable by the master and accessible
>> globally
>> to all the nodes).
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-webUI-application-details-page-tp3490p13055.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


RE: Q on downloading spark for standalone cluster

2014-08-29 Thread Sagar, Sanjeev
Hello Sparkies !

Could anyone please answer this? This is not an Hadoop cluster, so which 
download option should I use to download for standalone cluster ?

Also what are the best practices if you’ve 1TB of data and want to use spark ? 
Do you’ve to use Hadoop/CDH or some other option ?

Appreciate it.

From: Sagar, Sanjeev [mailto:sanjeev.sa...@mypointscorp.com]
Sent: Thursday, August 28, 2014 2:44 PM
To: Daniel Siegmann
Cc: user@spark.apache.org
Subject: RE: Q on downloading spark for standalone cluster

Hello Daniel, If you’re not using Hadoop then why you want to grab the Hadoop 
package? CDH5 will download all the Hadoop packages and cloudera manager too.

Just curious what happen if you start spark on EC2 cluster, what it choose for 
the data store as default?

-Sanjeev

From: Daniel Siegmann [mailto:daniel.siegm...@velos.io]
Sent: Thursday, August 28, 2014 2:04 PM
To: Sagar, Sanjeev
Cc: user@spark.apache.org
Subject: Re: Q on downloading spark for standalone cluster

If you aren't using Hadoop, I don't think it matters which you download. I'd 
probably just grab the Hadoop 2 package.
Out of curiosity, what are you using as your data store? I get the impression 
most Spark users are using HDFS or something built on top.

On Thu, Aug 28, 2014 at 4:07 PM, Sanjeev Sagar 
mailto:sanjeev.sa...@mypointscorp.com>> wrote:
Hello there,

I've a basic question on the downloadthat which option I need to downloadfor 
standalone cluster.

I've a private cluster of three machineson Centos. When I click on download it 
shows me following:


   Download Spark

The latest release is Spark 1.0.2, released August 5, 2014 (release notes) 
 (git tag) 


Pre-built packages:

 * For Hadoop 1 (HDP1, CDH3): find an Apache mirror
   

   or direct file download
   
 * For CDH4: find an Apache mirror
   

   or direct file download
   
 * For Hadoop 2 (HDP2, CDH5): find an Apache mirror
   

   or direct file download
   

Pre-built packages, third-party (NOTE: may include non ASF-compatible licenses):

 * For MapRv3: direct file download (external)
   
 * For MapRv4: direct file download (external)
   


From the above it looks like that I've to donwload Hadoop or CDH4 first in 
order to use Spark ? I've a standalone cluster and my data size is also like 
hundreds of Gig or close to Terabyte.

I don't get it that which one I need to download from the above list.

Could some one assist me that which one I need to download for standalone 
cluster and for big data foot print ?

or Hadoop is needed or mandatory for using Spark? that's not the understanding 
I've. My understanding is that you can use spark with Hadoop if you like from 
yarn2 but you could use spark standalone also without hadoop.

Please assist. I'm confused !

-Sanjeev


-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.org



--
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: 
www.velos.io


Announce: Smoke - a web frontend to Spark

2014-08-29 Thread Horacio G. de Oro
Hi everyone! I've been working on Smoke, a web frontend to
interactively launch Spark jobs without compiling it (only support
Scala right now, and launching the jobs on yarn-client mode). It works
executing the Scala script using "spark-shell" in the Spark server.

It's developed in Python, uses Celery/Redis to stream the job logs to
the web and is easy to install.

The project is on GitHub: https://github.com/data-tsunami/smoke
(install instructions, screenshots, etc.). It's in an early
development stage, but very usable.

Thanks!

Horacio


--

Horacio G. de Oro
Data Tsunami

Email: hgde...@gmail.com
  Web: http://www.data-tsunami.com/english/
  Cel: +54 9 3572 525359
 LinkedIn: https://www.linkedin.com/in/hgdeoro

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Hive max key length is 767 bytes

2014-08-29 Thread Michael Armbrust
Spark SQL is based on Hive 12.  They must have changed the maximum key size
between 12 and 13.


On Fri, Aug 29, 2014 at 4:38 AM, arthur.hk.c...@gmail.com <
arthur.hk.c...@gmail.com> wrote:

> Hi,
>
>
> Tried the same thing in HIVE directly without issue:
>
> HIVE:
> hive> create table test_datatype2 (testbigint bigint );
> OK
> Time taken: 0.708 seconds
>
> hive> drop table test_datatype2;
> OK
> Time taken: 23.272 seconds
>
>
>
> Then tried again in SPARK:
> scala> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> 14/08/29 19:33:52 INFO Configuration.deprecation:
> mapred.reduce.tasks.speculative.execution is deprecated. Instead, use
> mapreduce.reduce.speculative
> hiveContext: org.apache.spark.sql.hive.HiveContext =
> org.apache.spark.sql.hive.HiveContext@395c7b94
>
> scala> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> res0: org.apache.spark.sql.SchemaRDD =
> SchemaRDD[0] at RDD at SchemaRDD.scala:104
> == Query Plan ==
> 
>
> scala> hiveContext.hql("drop table test_datatype3")
>
> 14/08/29 19:34:14 ERROR DataNucleus.Datastore: An exception was thrown
> while adding/validating class(es) : Specified key was too long; max key
> length is 767 bytes
> com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Specified key
> was too long; max key length is 767 bytes
>  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39)
>
> 14/08/29 19:34:17 WARN DataNucleus.Query: Query for candidates of
> org.apache.hadoop.hive.metastore.model.MPartition and subclasses resulted
> in no possible candidates
> Error(s) were found while auto-creating/validating the datastore for
> classes. The errors are printed in the log, and are attached to this
> exception.
> org.datanucleus.exceptions.NucleusDataStoreException: Error(s) were found
> while auto-creating/validating the datastore for classes. The errors are
> printed in the log, and are attached to this exception.
>  at
> org.datanucleus.store.rdbms.RDBMSStoreManager$ClassAdder.verifyErrors(RDBMSStoreManager.java:3609)
>
>
> Caused by: com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException:
> Specified key was too long; max key length is 767 bytes
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>  at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39)
>
> 14/08/29 19:34:17 INFO DataNucleus.Datastore: The class
> "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as
> "embedded-only" so does not have its own datastore table.
> 14/08/29 19:34:17 INFO DataNucleus.Datastore: The class
> "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as
> "embedded-only" so does not have its own datastore table.
> 14/08/29 19:34:17 INFO DataNucleus.Datastore: The class
> "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as
> "embedded-only" so does not have its own datastore table.
> 14/08/29 19:34:17 INFO DataNucleus.Datastore: The class
> "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as
> "embedded-only" so does not have its own datastore table.
> 14/08/29 19:34:17 INFO DataNucleus.Datastore: The class
> "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as
> "embedded-only" so does not have its own datastore table.
> 14/08/29 19:34:17 INFO DataNucleus.Datastore: The class
> "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as
> "embedded-only" so does not have its own datastore table.
> 14/08/29 19:34:25 ERROR DataNucleus.Datastore: An exception was thrown
> while adding/validating class(es) : Specified key was too long; max key
> length is 767 bytes
> com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Specified key
> was too long; max key length is 767 bytes
>  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>
>
> Can anyone please help?
>
> Regards
> Arthur
>
>
> On 29 Aug, 2014, at 12:47 pm, arthur.hk.c...@gmail.com <
> arthur.hk.c...@gmail.com> wrote:
>
> (Please ignore if duplicated)
>
>
> Hi,
>
> I use Spark 1.0.2 with Hive 0.13.1
>
> I have already set the hive mysql database to latine1;
>
> mysql:
> alter database hive character set latin1;
>
> Spark:
> scala> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> scala> hiveContext.hql("create table test_datatype1 (testbigint bigint )")
> scala> hiveContext.hql("drop table test_datatype1")
>
>
> 14/08/29 12:31:55 INFO DataNucleus.Datastore: The class
> "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as
> "embedded-only" so does not have its own datastore table.
> 14/08/29 12:31:55 INFO DataNucleus.Datastore: The class
> "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as
> "embedded-only" so does not have its own datastore table.
> 14/08/29 12:31:55 INFO DataNucleus.Datastore: The class
> "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as
> "embedded-on

Re: Change delimiter when collecting SchemaRDD

2014-08-29 Thread yadid ayzenberg
Thanks Michael, that makes total sense.
It works perfectly.

Yadid


On Thu, Aug 28, 2014 at 9:19 PM, Michael Armbrust 
wrote:

> The comma is just the way the default toString works for Row objects.
>  Since SchemaRDDs are also RDDs, you can do arbitrary transformations on
> the Row objects that are returned.
>
> For example, if you'd rather the delimiter was '|':
>
> sql("SELECT * FROM src").map(_.mkString("|")).collect()
>
>
> On Thu, Aug 28, 2014 at 7:58 AM, yadid ayzenberg 
> wrote:
>
>> Hi All,
>>
>> Is there any way to change the delimiter from being a comma ?
>> Some of the strings in my data contain commas as well, making it very
>> difficult to parse the results.
>>
>> Yadid
>>
>
>


Problem Accessing Hive Table from hiveContext

2014-08-29 Thread Zitser, Igor
Hi All, 
New to spark and using Spark 1.0.2 and hive 0.12. 

If hive table created as test_datatypes(testbigint bigint, ss bigint )  "select 
* from test_datatypes" from spark works fine.

For "create table test_datatypes(testbigint bigint, testdec decimal(5,2) )"

scala> val dataTypes=hiveContext.hql("select * from test_datatypes")
14/08/28 21:18:44 INFO parse.ParseDriver: Parsing command: select * from 
test_datatypes
14/08/28 21:18:44 INFO parse.ParseDriver: Parse Completed
14/08/28 21:18:44 INFO analysis.Analyzer: Max iterations (2) reached for batch 
MultiInstanceRelations
14/08/28 21:18:44 INFO analysis.Analyzer: Max iterations (2) reached for batch 
CaseInsensitiveAttributeReferences
java.lang.IllegalArgumentException: Error: ',', ':', or ';' expected at 
position 14 from 'bigint:decimal(5,2)' [0:bigint, 6::, 7:decimal, 14:(, 15:5, 
16:,, 17:2, 18:)]
    at 
org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils$TypeInfoParser.parseTypeInfos(TypeInfoUtils.java:312)
    at 
org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils.getTypeInfosFromTypeString(TypeInfoUtils.java:716)
    at 
org.apache.hadoop.hive.serde2.lazy.LazyUtils.extractColumnInfo(LazyUtils.java:364)
    at 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.initSerdeParams(LazySimpleSerDe.java:288)
    at 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.initialize(LazySimpleSerDe.java:187)
    at 
org.apache.hadoop.hive.metastore.MetaStoreUtils.getDeserializer(MetaStoreUtils.java:218)
    at 
org.apache.hadoop.hive.ql.metadata.Table.getDeserializerFromMetaStore(Table.java:272)
    at 
org.apache.hadoop.hive.ql.metadata.Table.checkValidity(Table.java:175)
    at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:991)
    at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:924)
    at 
org.apache.spark.sql.hive.HiveMetastoreCatalog.lookupRelation(HiveMetastoreCatalog.scala:58)
    at 
org.apache.spark.sql.hive.HiveContext$$anon$2.org$apache$spark$sql$catalyst$analysis$OverrideCatalog$$super$lookupRelation(HiveContext.scala:143)
    at 
org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:122)
    at 
org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:122)
    at scala.Option.getOrElse(Option.scala:120)
    at 
org.apache.spark.sql.catalyst.analysis.OverrideCatalog$class.lookupRelation(Catalog.scala:122)
    at 
org.apache.spark.sql.hive.HiveContext$$anon$2.lookupRelation(HiveContext.scala:149)
    at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$2.applyOrElse(Analyzer.scala:83)
    at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$2.applyOrElse(Analyzer.scala:81)
    at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:165)
    at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:183)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at 
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at 
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:212)
    at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:168)
    at 
org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:156)


Same exception happens using table as "create table test_datatypes(testbigint 
bigint, testdate date )" . 

Thanks, Igor.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Too many open files

2014-08-29 Thread SK
Hi,

I am having the same problem reported by Michael. I am trying to open 30
files. ulimit -n  shows the limit is 1024. So I am not sure why the program
is failing with  "Too many open files" error. The total size of all the 30
files is 230 GB. 
I am running the job on a cluster with 10 nodes, each having 16 GB. The
error appears to be happening at the distinct() stage.

Here is my program. In the following code, are all the 10 nodes trying to
open all of the 30 files or are the files distributed among the 30 nodes?  

val baseFile = "/mapr/mapr_dir/files_2013apr*"
valx = sc.textFile(baseFile)).map { line =>
val
fields = line.split("\t")

(fields(11), fields(6)) 
  
}.distinct().countByKey()
val xrdd = sc.parallelize(x.toSeq)
xrdd.saveAsTextFile(...) 

Instead of using the glob *, I guess I can try using a for loop to read the
files one by one if that helps, but not sure if there is a more efficient
solution. 

The following is the error transcript: 

Job aborted due to stage failure: Task 1.0:201 failed 4 times, most recent
failure: Exception failure in TID 902 on host 192.168.13.11:
java.io.FileNotFoundException:
/tmp/spark-local-20140829131200-0bb7/08/shuffle_0_201_999 (Too many open
files) 
java.io.FileOutputStream.open(Native Method)
java.io.FileOutputStream.(FileOutputStream.java:221)
org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:116)
org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:177)
org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:161)
org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:158)
scala.collection.Iterator$class.foreach(Iterator.scala:727)
org.apache.spark.util.collection.AppendOnlyMap$$anon$1.foreach(AppendOnlyMap.scala:159)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
org.apache.spark.scheduler.Task.run(Task.scala:51)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:744) Driver stacktrace:





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Too-many-open-files-tp1464p13144.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Possible to make one executor be able to work on multiple tasks simultaneously?

2014-08-29 Thread Victor Tso-Guillen
I'm thinking of local mode where multiple virtual executors occupy the same
vm. Can we have the same configuration in spark standalone cluster mode?


Re: Possible to make one executor be able to work on multiple tasks simultaneously?

2014-08-29 Thread Matei Zaharia
Yes, executors run one task per core of your machine by default. You can also 
manually launch them with more worker threads than you have cores. What cluster 
manager are you on?

Matei

On August 29, 2014 at 11:24:33 AM, Victor Tso-Guillen (v...@paxata.com) wrote:

I'm thinking of local mode where multiple virtual executors occupy the same vm. 
Can we have the same configuration in spark standalone cluster mode?

SparkSql is slow over yarn

2014-08-29 Thread Chirag Aggarwal
When I run SparkSql over yarn, it runs 2-4 times slower as compared to when its 
run in local mode. Please note that I have a four node yarn setup. Has anyone 
else also witnessed the same.



Re: DStream repartitioning, performance tuning processing

2014-08-29 Thread Tim Smith
I wrote a long post about how I arrived here but in a nutshell I don't see
evidence of re-partitioning and workload distribution across the cluster.
My new fangled way of starting the job is:

run=`date +"%m-%d-%YT%T"`; \
nohup spark-submit --class logStreamNormalizer \
--master yarn log-stream-normalizer_2.10-1.0.jar \
--jars
spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar
\
--driver-memory 8G \
--executor-memory 30G \
--executor-cores 16 \
--num-executors 8 \
--spark.serializer org.apache.spark.serializer.KryoSerializer \
--spark.rdd.compress true \
--spark.io.compression.codec org.apache.spark.io.SnappyCompressionCodec \
--spark.akka.threads 16 \
--spark.task.maxFailures 64 \
--spark.scheduler.mode FAIR \
>logs/normRunLog-$run.log \
2>logs/normRunLogError-$run.log & \
echo $! > logs/run-$run.pid

Since the job spits out lots of logs, here is how I am trying to determine
if any tasks got assigned to non-local executors.
$ grep TID logs/normRunLogError-08-29-2014T18\:28\:32.log  | grep Starting
| grep -v NODE_LOCAL | grep -v PROCESS_LOCAL

Yields no lines.

If I look at resource pool usage in YARN, this app is assigned 252.5GB of
memory, 128 VCores and 9 containers. Am I missing something here?

Thanks,

Tim







On Thu, Aug 28, 2014 at 11:55 PM, Tim Smith  wrote:

> I set partitions to 64:
>
> //
>  kInMsg.repartition(64)
>  val outdata = kInMsg.map(x=>normalizeLog(x._2,configMap))
> //
>
> Still see all activity only on the two nodes that seem to be receiving
> from Kafka.
>
> On Thu, Aug 28, 2014 at 5:47 PM, Tim Smith  wrote:
> > TD - Apologies, didn't realize I was replying to you instead of the list.
> >
> > What does "numPartitions" refer to when calling createStream? I read an
> > earlier thread that seemed to suggest that numPartitions translates to
> > partitions created on the Spark side?
> >
> http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201407.mbox/%3ccaph-c_o04j3njqjhng5ho281mqifnf3k_r6coqxpqh5bh6a...@mail.gmail.com%3E
> >
> > Actually, I re-tried with 64 numPartitions in createStream and that
> didn't
> > work. I will manually set "repartition" to 64/128 and see how that goes.
> >
> > Thanks.
> >
> >
> >
> >
> > On Thu, Aug 28, 2014 at 5:42 PM, Tathagata Das <
> tathagata.das1...@gmail.com>
> > wrote:
> >>
> >> Having 16 partitions in KafkaUtils.createStream does not translate to
> the
> >> RDDs in Spark / Spark Streaming having 16 partitions. Repartition is the
> >> best way to distribute the received data between all the nodes, as long
> as
> >> there are sufficient number of partitions (try setting it to 2x the
> number
> >> cores given to the application).
> >>
> >> Yeah, in 1.0.0, ttl should be unnecessary.
> >>
> >>
> >>
> >> On Thu, Aug 28, 2014 at 5:17 PM, Tim Smith  wrote:
> >>>
> >>> On Thu, Aug 28, 2014 at 4:19 PM, Tathagata Das
> >>>  wrote:
> 
>  If you are repartitioning to 8 partitions, and your node happen to
> have
>  at least 4 cores each, its possible that all 8 partitions are
> assigned to
>  only 2 nodes. Try increasing the number of partitions. Also make sure
> you
>  have executors (allocated by YARN) running on more than two nodes if
> you
>  want to use all 11 nodes in your yarn cluster.
> >>>
> >>>
> >>> If you look at the code, I commented out the manual re-partitioning to
> 8.
> >>> Instead, I am created 16 partitions when I call createStream. But I
> will
> >>> increase the partitions to, say, 64 and see if I get better
> parallelism.
> >>>
> 
> 
>  If you are using Spark 1.x, then you dont need to set the ttl for
>  running Spark Streaming. In case you are using older version, why do
> you
>  want to reduce it? You could reduce it, but it does increase the risk
> of the
>  premature cleaning, if once in a while things get delayed by 20
> seconds. I
>  dont see much harm in keeping the ttl at 60 seconds (a bit of extra
> garbage
>  shouldnt hurt performance).
> 
> >>>
> >>> I am running 1.0.0 (CDH5) so ttl setting is redundant? But you are
> right,
> >>> unless I have memory issues, more aggressive pruning won't help.
> >>>
> >>> Thanks,
> >>>
> >>> Tim
> >>>
> >>>
> >>>
> 
>  TD
> 
> 
>  On Thu, Aug 28, 2014 at 3:16 PM, Tim Smith  wrote:
> >
> > Hi,
> >
> > In my streaming app, I receive from kafka where I have tried setting
> > the partitions when calling "createStream" or later, by calling
> repartition
> > - in both cases, the number of nodes running the tasks seems to be
> > stubbornly stuck at 2. Since I have 11 nodes in my cluster, I was
> hoping to
> > use more nodes.
> >
> > I am starting the job as:
> > nohup spark-submit --class logStreamNormalizer --master yarn
> > log-stream-normalizer_2.10-1.0.jar --jars
> >
> spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jack

Re: Possible to make one executor be able to work on multiple tasks simultaneously?

2014-08-29 Thread Victor Tso-Guillen
Standalone. I'd love to tell it that my one executor can simultaneously
serve, say, 16 tasks at once for an arbitrary number of distinct jobs.


On Fri, Aug 29, 2014 at 11:29 AM, Matei Zaharia 
wrote:

> Yes, executors run one task per core of your machine by default. You can
> also manually launch them with more worker threads than you have cores.
> What cluster manager are you on?
>
> Matei
>
> On August 29, 2014 at 11:24:33 AM, Victor Tso-Guillen (v...@paxata.com)
> wrote:
>
>  I'm thinking of local mode where multiple virtual executors occupy the
> same vm. Can we have the same configuration in spark standalone cluster
> mode?
>
>


Re: Spark SQL : how to find element where a field is in a given set

2014-08-29 Thread Jaonary Rabarisoa
1.0.2


On Friday, August 29, 2014, Michael Armbrust  wrote:

> What version are you using?
>
>
>
> On Fri, Aug 29, 2014 at 2:22 AM, Jaonary Rabarisoa  > wrote:
>
>> Still not working for me. I got a compilation error : *value in is not a
>> member of Symbol.* Any ideas ?
>>
>>
>> On Fri, Aug 29, 2014 at 9:46 AM, Michael Armbrust > > wrote:
>>
>>> To pass a list to a variadic function you can use the type ascription
>>> :_*
>>>
>>> For example:
>>>
>>> val longList = Seq[Expression]("a", "b", ...)
>>> table("src").where('key in (longList: _*))
>>>
>>> Also, note that I had to explicitly specify Expression as the type
>>> parameter of Seq to ensure that the compiler converts "a" and "b" into
>>> Spark SQL expressions.
>>>
>>>
>>>
>>>
>>> On Thu, Aug 28, 2014 at 11:52 PM, Jaonary Rabarisoa >> > wrote:
>>>
 ok, but what if I have a long list do I need to hard code like this
 every element of my list of is there a function that translate a list into
 a tuple ?


 On Fri, Aug 29, 2014 at 3:24 AM, Michael Armbrust <
 mich...@databricks.com
 > wrote:

> You don't need the Seq, as in is a variadic function.
>
> personTable.where('name in ("foo", "bar"))
>
>
>
> On Thu, Aug 28, 2014 at 3:09 AM, Jaonary Rabarisoa  > wrote:
>
>> Hi all,
>>
>> What is the expression that I should use with spark sql DSL if I need
>> to retreive
>> data with a field in a given set.
>> For example :
>>
>> I have the following schema
>>
>> case class Person(name: String, age: Int)
>>
>> And I need to do something like :
>>
>> personTable.where('name in Seq("foo", "bar")) ?
>>
>>
>> Cheers.
>>
>>
>> Jaonary
>>
>
>

>>>
>>
>


Re: DStream repartitioning, performance tuning processing

2014-08-29 Thread Tim Smith
Crash again. On the driver, logs say:
14/08/29 19:04:55 INFO BlockManagerMaster: Removed 7 successfully in
removeExecutor
org.apache.spark.SparkException: Job aborted due to stage failure: Task
2.0:0 failed 4 times, most recent failure: TID 6383 on host
node-dn1-2-acme.com failed for unknown reason
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


I go look at OS on node-dn1-2 and container logs for TID6383 but find
nothing.
# grep 6383 stderr
14/08/29 18:52:51 INFO CoarseGrainedExecutorBackend: Got assigned task 6383
14/08/29 18:52:51 INFO Executor: Running task ID 6383

However, last message on the container is timestamped "19:04:51" that tells
me the executor was killed for some reason right before the driver noticed
that executor/task failure.

How come my task failed only after 4 times although my config says failure
threshold is 64?








On Fri, Aug 29, 2014 at 12:00 PM, Tim Smith  wrote:

> I wrote a long post about how I arrived here but in a nutshell I don't see
> evidence of re-partitioning and workload distribution across the cluster.
> My new fangled way of starting the job is:
>
> run=`date +"%m-%d-%YT%T"`; \
> nohup spark-submit --class logStreamNormalizer \
> --master yarn log-stream-normalizer_2.10-1.0.jar \
> --jars
> spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar
> \
> --driver-memory 8G \
> --executor-memory 30G \
> --executor-cores 16 \
> --num-executors 8 \
> --spark.serializer org.apache.spark.serializer.KryoSerializer \
> --spark.rdd.compress true \
> --spark.io.compression.codec org.apache.spark.io.SnappyCompressionCodec \
> --spark.akka.threads 16 \
> --spark.task.maxFailures 64 \
> --spark.scheduler.mode FAIR \
> >logs/normRunLog-$run.log \
> 2>logs/normRunLogError-$run.log & \
> echo $! > logs/run-$run.pid
>
> Since the job spits out lots of logs, here is how I am trying to determine
> if any tasks got assigned to non-local executors.
> $ grep TID logs/normRunLogError-08-29-2014T18\:28\:32.log  | grep Starting
> | grep -v NODE_LOCAL | grep -v PROCESS_LOCAL
>
> Yields no lines.
>
> If I look at resource pool usage in YARN, this app is assigned 252.5GB of
> memory, 128 VCores and 9 containers. Am I missing something here?
>
> Thanks,
>
> Tim
>
>
>
>
>
>
>
> On Thu, Aug 28, 2014 at 11:55 PM, Tim Smith  wrote:
>
>> I set partitions to 64:
>>
>> //
>>  kInMsg.repartition(64)
>>  val outdata = kInMsg.map(x=>normalizeLog(x._2,configMap))
>> //
>>
>> Still see all activity only on the two nodes that seem to be receiving
>> from Kafka.
>>
>> On Thu, Aug 28, 2014 at 5:47 PM, Tim Smith  wrote:
>> > TD - Apologies, didn't realize I was replying to you instead of the
>> list.
>> >
>> > What does "numPartitions" refer to when calling createStream? I read an
>> > earlier thread that seemed to suggest that numPartitions translates to
>> > partitions created on the Spark side?
>> >
>> http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201407.mbox/%3ccaph-c_o04j3njqjhng5ho281mqifnf3k_r6coqxpqh5bh6a...@mail.gmail.com%3E
>> >
>> > Actually, I re-tried with 64 numPartitions in createStream and that
>> didn't
>> > work. I will manually set "repartition" to 64/1

Spark Streaming with Kafka, building project with 'sbt assembly' is extremely slow

2014-08-29 Thread Aris
Hi folks,

I am trying to use Kafka with Spark Streaming, and it appears I cannot do
the normal 'sbt package' as I do with other Spark applications, such as
Spark alone or Spark with MLlib. I learned I have to build with the
sbt-assembly plugin.

OK, so here is my build.sbt file for my extremely simple test Kafka/Spark
Streaming project. It Takes almost 30 minutes to build! This is a Centos
Linux machine on SSDs with 4GB of RAM, it's never been slow for me. To
compare, sbt assembly for the entire Spark project itself takes less than
10 minutes.

At the bottom of this file I am trying to play with 'cacheOutput' options,
because I read online that maybe I am calculating SHA-1 for all the *.class
files in this super JAR.

I also copied the mergeStrategy from Spark contributor TD Spark Streaming
tutorial from Spark Summit 2014.

Again, is there some better way to build this JAR file, just using sbt
package? This is process is working, but very slow.

Any help with speeding up this compilation is really appreciated!!

Aris

-

import AssemblyKeys._ // put this at the top of the file

name := "streamingKafka"

version := "1.0"

scalaVersion := "2.10.4"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "1.0.1" % "provided",
  "org.apache.spark" %% "spark-streaming" % "1.0.1" % "provided",
  "org.apache.spark" %% "spark-streaming-kafka" % "1.0.1"
)

assemblySettings

jarName in assembly := "streamingkafka-assembly.jar"

mergeStrategy in assembly := {
  case m if m.toLowerCase.endsWith("manifest.mf")  =>
MergeStrategy.discard
  case m if m.toLowerCase.matches("meta-inf.*\\.sf$")  =>
MergeStrategy.discard
  case "log4j.properties"  =>
MergeStrategy.discard
  case m if m.toLowerCase.startsWith("meta-inf/services/") =>
MergeStrategy.filterDistinctLines
  case "reference.conf"=>
MergeStrategy.concat
  case _   =>
MergeStrategy.first
}

assemblyOption in assembly ~= { _.copy(cacheOutput = false) }


Re: SparkSql is slow over yarn

2014-08-29 Thread Nishkam Ravi
Can you share more details about your job, cluster properties and
configuration parameters?

Thanks,
Nishkam


On Fri, Aug 29, 2014 at 11:33 AM, Chirag Aggarwal <
chirag.aggar...@guavus.com> wrote:

>  When I run SparkSql over yarn, it runs 2-4 times slower as compared to
> when its run in local mode. Please note that I have a four node yarn setup.
> Has anyone else also witnessed the same.
>
>


Re: Anyone know hot to submit spark job to yarn in java code?

2014-08-29 Thread Archit Thakur
Hi,

I am facing the same problem.
Did you find any solution or work around?

Thanks and Regards,
Archit Thakur.


On Thu, Jan 16, 2014 at 6:22 AM, Liu, Raymond  wrote:

> Hi
>
> Regarding your question
>
> 1) when I run the above script, which jar is beed submitted to the yarn
> server ?
>
> What SPARK_JAR env point to and the --jar point to are both submitted to
> the yarn server
>
> 2) It like the  spark-assembly-0.8.1-incubating-hadoop2.0.5-alpha.jar
> plays the role of client side and
> spark-examples-assembly-0.8.1-incubating.jar goes with spark runtime and
> examples which will be running in yarn, am I right?
>
> The spark-assembly-0.8.1-incubating-hadoop2.0.5-alpha.jar will also go to
> yarn cluster as runtime for app
> jar(spark-examples-assembly-0.8.1-incubating.jar)
>
> 3) Does anyone have any similar experience ? I did lots of hadoop MR stuff
> and want follow the same logic to submit spark job. For now I can only find
> the command line way to submit spark job to yarn. I believe there is a easy
> way to integration spark in a web allocation.
>
> You can use the yarn-client mode, you might want to take a look on docs/
> running-on-yarn.md, and probably you might want to try master branch to
> check our latest update on this part of docs. And in yarn client mode, the
> sparkcontext itself will do similar thing as what the command line is doing
> to submit a yarn job
>
> Then to use it with java, you might want to try out JavaSparkContext
> instead of SparkContext, I don't personally run it with complicated
> applications. But a small example app did works.
>
>
> Best Regards,
> Raymond Liu
>
> -Original Message-
> From: John Zhao [mailto:jz...@alpinenow.com]
> Sent: Thursday, January 16, 2014 2:25 AM
> To: u...@spark.incubator.apache.org
> Subject: Anyone know hot to submit spark job to yarn in java code?
>
> Now I am working on a web application and  I want to  submit a spark job
> to hadoop yarn.
> I have already do my own assemble and  can run it in command line by the
> following script:
>
> export YARN_CONF_DIR=/home/gpadmin/clusterConfDir/yarn
> export
> SPARK_JAR=./assembly/target/scala-2.9.3/spark-assembly-0.8.1-incubating-hadoop2.0.5-alpha.jar
> ./spark-class org.apache.spark.deploy.yarn.Client  --jar
> ./examples/target/scala-2.9.3/spark-examples-assembly-0.8.1-incubating.jar
> --class org.apache.spark.examples.SparkPi --args yarn-standalone
> --num-workers 3 --master-memory 1g --worker-memory 512m --worker-cores 1
>
> It works fine.
> The I realized that it is hard to submit the job from a web application
> .Looks like the spark-assembly-0.8.1-incubating-hadoop2.0.5-alpha.jar or
> spark-examples-assembly-0.8.1-incubating.jar is a really big jar. I believe
> it contains everything .
> So my question is :
> 1) when I run the above script, which jar is beed submitted to the yarn
> server ?
> 2) It loos like the  spark-assembly-0.8.1-incubating-hadoop2.0.5-alpha.jar
> plays the role of client side and
> spark-examples-assembly-0.8.1-incubating.jar goes with spark runtime and
> examples which will be running in yarn, am I right?
> 3) Does anyone have any similar experience ? I did lots of hadoop MR stuff
> and want follow the same logic to submit spark job. For now I can only find
> the command line way to submit spark job to yarn. I believe there is a easy
> way to integration spark in a web allocation.
>
>
> Thanks.
> John.
>


[PySpark] large # of partitions causes OOM

2014-08-29 Thread Nick Chammas
Here’s a repro for PySpark:

a = sc.parallelize(["Nick", "John", "Bob"])
a = a.repartition(24000)
a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y).take(1)

When I try this on an EC2 cluster with 1.1.0-rc2 and Python 2.7, this is
what I get:

>>> a = sc.parallelize(["Nick", "John", "Bob"])>>> a = a.repartition(24000)>>> 
>>> a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y).take(1)14/08/29 
>>> 21:53:40 WARN BlockManagerMasterActor: Removing BlockManager 
>>> BlockManagerId(0, ip-10-138-29-167.ec2.internal, 46252, 0) with no recent 
>>> heart beats: 175143ms exceeds 45000ms14/08/29 21:53:50 WARN 
>>> BlockManagerMasterActor: Removing BlockManager BlockManagerId(10, 
>>> ip-10-138-18-106.ec2.internal, 33711, 0) with no recent heart beats: 
>>> 175359ms exceeds 45000ms14/08/29 21:54:02 WARN BlockManagerMasterActor: 
>>> Removing BlockManager BlockManagerId(19, ip-10-139-36-207.ec2.internal, 
>>> 52208, 0) with no recent heart beats: 173061ms exceeds 45000ms14/08/29 
>>> 21:54:13 WARN BlockManagerMasterActor: Removing BlockManager 
>>> BlockManagerId(5, ip-10-73-142-70.ec2.internal, 56162, 0) with no recent 
>>> heart beats: 176816ms exceeds 45000ms14/08/29 21:54:22 WARN 
>>> BlockManagerMasterActor: Removing BlockManager BlockManagerId(7, 
>>> ip-10-236-145-200.ec2.internal, 40959, 0) with no recent heart beats: 
>>> 182241ms exceeds 45000ms14/08/29 21:54:40 WARN BlockManagerMasterActor: 
>>> Removing BlockManager BlockManagerId(4, ip-10-139-1-195.ec2.internal, 
>>> 49221, 0) with no recent heart beats: 178406ms exceeds 45000ms14/08/29 
>>> 21:54:41 ERROR Utils: Uncaught exception in thread Result resolver thread-3
java.lang.OutOfMemoryError: Java heap space
at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:296)
at 
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:35)
at 
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:18)
at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:611)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at 
org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:162)
at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79)
at 
org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:514)
at 
org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:355)
at 
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:68)
at 
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
at 
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311)
at 
org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:46)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Exception in thread "Result resolver thread-3" 14/08/29 21:56:26 ERROR
SendingConnection: Exception while reading SendingConnection to
ConnectionManagerId(ip-10-73-142-223.ec2.internal,54014)
java.nio.channels.ClosedChannelException
at sun.nio.ch.SocketChannelImpl.ensureReadOpen(SocketChannelImpl.java:252)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:295)
at org.apache.spark.network.SendingConnection.read(Connection.scala:390)
at 
org.apache.spark.network.ConnectionManager$$anon$7.run(ConnectionManager.scala:199)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
java.lang.OutOfMemoryError: Java heap space
at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:296)
at 
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:35)
at 
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:18)
at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:611)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at 
org.apache.spark.serializer.KryoSerializerInstance.dese

Re: Low Level Kafka Consumer for Spark

2014-08-29 Thread Tim Smith
Good to see I am not the only one who cannot get incoming Dstreams to
repartition. I tried repartition(512) but still no luck - the app
stubbornly runs only on two nodes. Now this is 1.0.0 but looking at
release notes for 1.0.1 and 1.0.2, I don't see anything that says this
was an issue and has been fixed.

How do I debug the repartition() statement to see what's the flow
after the job hits that statement?


On Fri, Aug 29, 2014 at 8:31 AM, bharatvenkat  wrote:
> Chris,
>
> I did the Dstream.repartition mentioned in the document on parallelism in
> receiving, as well as set "spark.default.parallelism" and it still uses only
> 2 nodes in my cluster.  I notice there is another email thread on the same
> topic:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/DStream-repartitioning-performance-tuning-processing-td13069.html
>
> My code is in Java and here is what I have:
>
>JavaPairReceiverInputDStream messages =
>
> KafkaUtils.createStream(ssc, zkQuorum,
> "cse-job-play-consumer", kafkaTopicMap);
>
> JavaPairDStream newMessages =
> messages.repartition(partitionSize);// partitionSize=30
>
> JavaDStream lines = newMessages.map(new
> Function, String>() {
> ...
>
> public String call(Tuple2 tuple2) {
>   return tuple2._2();
> }
>   });
>
> JavaDStream words = lines.flatMap(new
> MetricsComputeFunction()
> );
>
> JavaPairDStream wordCounts = words.mapToPair(
> new PairFunction() {
>...
> }
> );
>
>  wordCounts.foreachRDD(new Function,
> Void>() {...});
>
> Thanks,
> Bharat
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p13131.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: [Spark Streaming] kafka consumer announce

2014-08-29 Thread Evgeniy Shishkin
TD, 

can you please comment on this code?

I am really interested in including this code in Spark.

But i am bothering about some point about persistence:
1. When we extend Receiver and call store, 
is it blocking call? Does it return only when spark stores rdd as requested 
(i.e. replicated or on disk)?
Or is there some buffer allowing dataloss? In the latter case, can we have some 
callback telling to proceed with storing offsets.

2. I saw you implemented some rate limiting. 
Can you clarify how it works? In the face of network receiver getting data as 
fast as it can, and you liming this data in BM.
what happens with exceeding data? Is it discarded? And if not, what happens?

There is a lot of open questions how to make streaming reliable, and i have 
plenty of questions offlist.
But i do not how to improve the code without spark support. 


> On 21 Aug 2014, at 16:17, Evgeniy Shishkin  wrote:
> 
> Hello,
> 
> we are glad to announce yet another kafka input stream.
> 
> Available at https://github.com/wgnet/spark-kafka-streaming
> 
> It is used in production for about 3 months.
> We will be happy to hear your feedback.
> 
> Custom Spark Kafka consumer based on Kafka SimpleConsumer API.
> 
> Features
> 
>   • discover kafka metadata from zookeeper (more reliable than from 
> brokers, does not depend on broker list changes)
>   • reding from multiple topics
>   • reliably handles leader election and topic reassignment
>   • saves offsets and stream metadata in hbase (more robust than 
> zookeeper)
>   • supports metrics via spark metrics mechanism (jmx, graphite, etc.)
> Todo
> 
>   • abstract offset storage
>   • time controlled offsets commit
>   • refactor kafka message to rdd elements transformation (flatmapper 
> method)
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark SQL : how to find element where a field is in a given set

2014-08-29 Thread Michael Armbrust
This feature was not part of that version.  It will be in 1.1.


On Fri, Aug 29, 2014 at 12:33 PM, Jaonary Rabarisoa 
wrote:

>
> 1.0.2
>
>
> On Friday, August 29, 2014, Michael Armbrust 
> wrote:
>
>> What version are you using?
>>
>>
>>
>> On Fri, Aug 29, 2014 at 2:22 AM, Jaonary Rabarisoa 
>> wrote:
>>
>>> Still not working for me. I got a compilation error : *value in is not
>>> a member of Symbol.* Any ideas ?
>>>
>>>
>>> On Fri, Aug 29, 2014 at 9:46 AM, Michael Armbrust <
>>> mich...@databricks.com> wrote:
>>>
 To pass a list to a variadic function you can use the type ascription
 :_*

 For example:

 val longList = Seq[Expression]("a", "b", ...)
 table("src").where('key in (longList: _*))

 Also, note that I had to explicitly specify Expression as the type
 parameter of Seq to ensure that the compiler converts "a" and "b" into
 Spark SQL expressions.




 On Thu, Aug 28, 2014 at 11:52 PM, Jaonary Rabarisoa 
 wrote:

> ok, but what if I have a long list do I need to hard code like this
> every element of my list of is there a function that translate a list into
> a tuple ?
>
>
> On Fri, Aug 29, 2014 at 3:24 AM, Michael Armbrust <
> mich...@databricks.com> wrote:
>
>> You don't need the Seq, as in is a variadic function.
>>
>> personTable.where('name in ("foo", "bar"))
>>
>>
>>
>> On Thu, Aug 28, 2014 at 3:09 AM, Jaonary Rabarisoa > > wrote:
>>
>>> Hi all,
>>>
>>> What is the expression that I should use with spark sql DSL if I
>>> need to retreive
>>> data with a field in a given set.
>>> For example :
>>>
>>> I have the following schema
>>>
>>> case class Person(name: String, age: Int)
>>>
>>> And I need to do something like :
>>>
>>> personTable.where('name in Seq("foo", "bar")) ?
>>>
>>>
>>> Cheers.
>>>
>>>
>>> Jaonary
>>>
>>
>>
>

>>>
>>
>


What is the better data structure in an RDD

2014-08-29 Thread cjwang
I need some advices regarding how data are stored in an RDD.  I have millions
of records, called "Measures".  They are bucketed with keys of String type. 
I wonder if I need to store them as RDD[(String, Measure)] or RDD[(String,
Iterable[Measure])], and why?

Data in each bucket are not related most of the time.  The operations that I
often needs to do are:

- Sort the Measures in each bucket separately
- Aggregate the Measures in each bucket separately
- Combine Measures in two RDDs into one based on their bucket keys






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/What-is-the-better-data-structure-in-an-RDD-tp13159.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Low Level Kafka Consumer for Spark

2014-08-29 Thread Tim Smith
I create my DStream very simply as:
val kInMsg =
KafkaUtils.createStream(ssc,"zkhost1:2181/zk_kafka","testApp",Map("rawunstruct"
-> 8))
.
.
eventually, before I operate on the DStream, I repartition it:
kInMsg.repartition(512)

Are you saying that ^^ repartition doesn't split by dstream into multiple
smaller streams? Should I manually create multiple Dstreams like this?:
val kInputs = (1 to 10).map {_=> KafkaUtils.createStream()}

Then I apply some custom logic to it as:
val outdata = kInMsg.map(x=>normalizeLog(x._2,configMap)) //where
normalizeLog takes a String and Map of regex and returns a string

In my case, I think I have traced the issue to the receiver executor being
killed by Yarn:
14/08/29 22:46:30 ERROR YarnClientClusterScheduler: Lost executor 1 on
node-dn1-4-acme.com: remote Akka client disassociated

This be the root cause?
http://apache-spark-developers-list.1001551.n3.nabble.com/Lost-executor-on-YARN-ALS-iterations-td7916.html
https://issues.apache.org/jira/browse/SPARK-2121





On Fri, Aug 29, 2014 at 3:28 PM, Sean Owen  wrote:

> Are you using multiple Dstreams? repartitioning does not affect how
> many receivers you have. It's on 2 nodes for each receiver. You need
> multiple partitions in the queue, each consumed by a DStream, if you
> mean to parallelize consuming the queue.
>
> On Fri, Aug 29, 2014 at 11:08 PM, Tim Smith  wrote:
> > Good to see I am not the only one who cannot get incoming Dstreams to
> > repartition. I tried repartition(512) but still no luck - the app
> > stubbornly runs only on two nodes. Now this is 1.0.0 but looking at
> > release notes for 1.0.1 and 1.0.2, I don't see anything that says this
> > was an issue and has been fixed.
> >
> > How do I debug the repartition() statement to see what's the flow
> > after the job hits that statement?
> >
> >
> > On Fri, Aug 29, 2014 at 8:31 AM, bharatvenkat 
> wrote:
> >> Chris,
> >>
> >> I did the Dstream.repartition mentioned in the document on parallelism
> in
> >> receiving, as well as set "spark.default.parallelism" and it still uses
> only
> >> 2 nodes in my cluster.  I notice there is another email thread on the
> same
> >> topic:
> >>
> >>
> http://apache-spark-user-list.1001560.n3.nabble.com/DStream-repartitioning-performance-tuning-processing-td13069.html
> >>
> >> My code is in Java and here is what I have:
> >>
> >>JavaPairReceiverInputDStream messages =
> >>
> >> KafkaUtils.createStream(ssc, zkQuorum,
> >> "cse-job-play-consumer", kafkaTopicMap);
> >>
> >> JavaPairDStream newMessages =
> >> messages.repartition(partitionSize);// partitionSize=30
> >>
> >> JavaDStream lines = newMessages.map(new
> >> Function, String>() {
> >> ...
> >>
> >> public String call(Tuple2 tuple2) {
> >>   return tuple2._2();
> >> }
> >>   });
> >>
> >> JavaDStream words = lines.flatMap(new
> >> MetricsComputeFunction()
> >> );
> >>
> >> JavaPairDStream wordCounts = words.mapToPair(
> >> new PairFunction() {
> >>...
> >> }
> >> );
> >>
> >>  wordCounts.foreachRDD(new Function Integer>,
> >> Void>() {...});
> >>
> >> Thanks,
> >> Bharat
> >>
> >>
> >>
> >> --
> >> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p13131.html
> >> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >>
> >> -
> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >> For additional commands, e-mail: user-h...@spark.apache.org
> >>
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>


Re: Spark Streaming reset state

2014-08-29 Thread Christophe Sebastien
You can use a tuple associating a timestamp to your running sum; and have
COMPUTE_RUNNING_SUM to reset the running sum to zero when the timestamp is
more than 5 minutes old.
You'll still have a leak doing so if your keys keep changing, though.

--Christophe


2014-08-29 9:00 GMT-07:00 Eko Susilo :

>
> so the "codes" currently holding RDD containing codes and its respective
> counter. I would like to find a way to reset those RDD after some period of
> time.
>
>
> On Fri, Aug 29, 2014 at 5:55 PM, Sean Owen  wrote:
>
>> "codes" is a DStream, not an RDD. The remember() method controls how
>> long Spark Streaming holds on to the RDDs itself. Clarify what you
>> mean by "reset"? codes provides a stream of RDDs that contain your
>> computation over a window of time. New RDDs come with the computation
>> over new data.
>>
>> On Fri, Aug 29, 2014 at 4:30 PM, Eko Susilo
>>  wrote:
>> > Hi all,
>> >
>> > I would like to ask some advice about resetting spark stateful
>> operation.
>> > so i tried like this:
>> >
>> > JavaStreamingContext jssc = new JavaStreamingContext(context, new
>> > Duration(5000));
>> > jssc.remember(Duration(5*60*1000));
>> > jssc.checkpoint(ApplicationConstants.HDFS_STREAM_DIRECTORIES);
>> > JavaPairReceiverInputDStream messages =
>> >(JavaPairReceiverInputDStream)
>> > KafkaUtils.createStream(jssc, "localhost:2181", "test-consumer-group",
>> > topicMap);
>> > JavaPairDStream windowed= messages.window(WINDOW_LENGTH,
>> > SLIDE_INTERVAL);
>> > JavaDStream lines = windowed.map(new Function> > String>, LogEntry>() { @Override public LogEntry call(Tuple2> String>
>> > tuple2) { LogEntry _Result=Utils.parseLine(tuple2._2()); return
>> _Result; }
>> > }).filter(Functions.FILTER_LOG_ENTRY).cache();
>> >
>> > JavaPairDStream codes=lines.mapToPair(Functions.GET_CODE).
>> > reduceByKey(Functions.SUM_REDUCER).
>> > updateStateByKey(COMPUTE_RUNNING_SUM);
>> > i thought by setting the remember to 5 minutes, the "codes" RDD that
>> derived
>> > from messages would also be reseted in 5 minutes, but in fact no.
>> >
>> > Is there any way to reset the "codes" RDD after a period of time (5
>> > minutes)?
>> >
>> > Thanks
>> >
>> >
>> >
>> > --
>> > Best Regards,
>> > Eko Susilo
>>
>
>
>
> --
> Best Regards,
> Eko Susilo
>


Re: Possible to make one executor be able to work on multiple tasks simultaneously?

2014-08-29 Thread Victor Tso-Guillen
Any more thoughts on this? I'm not sure how to do this yet.


On Fri, Aug 29, 2014 at 12:10 PM, Victor Tso-Guillen 
wrote:

> Standalone. I'd love to tell it that my one executor can simultaneously
> serve, say, 16 tasks at once for an arbitrary number of distinct jobs.
>
>
> On Fri, Aug 29, 2014 at 11:29 AM, Matei Zaharia 
> wrote:
>
>> Yes, executors run one task per core of your machine by default. You can
>> also manually launch them with more worker threads than you have cores.
>> What cluster manager are you on?
>>
>> Matei
>>
>> On August 29, 2014 at 11:24:33 AM, Victor Tso-Guillen (v...@paxata.com)
>> wrote:
>>
>>  I'm thinking of local mode where multiple virtual executors occupy the
>> same vm. Can we have the same configuration in spark standalone cluster
>> mode?
>>
>>
>


Re: Low Level Kafka Consumer for Spark

2014-08-29 Thread Tim Smith
Ok, so I did this:
val kInStreams = (1 to 10).map{_ =>
KafkaUtils.createStream(ssc,"zkhost1:2181/zk_kafka","testApp",Map("rawunstruct"
-> 1)) }
val kInMsg = ssc.union(kInStreams)
val outdata = kInMsg.map(x=>normalizeLog(x._2,configMap))

This has improved parallelism. Earlier I would only get a "Stream 0". Now I
have "Streams [0-9]". Of course, since the kafka topic has only three
partitions, only three of those streams are active but I am seeing more
blocks being pulled across the three streams total that what one was doing
earlier. Also, four nodes are actively processing tasks (vs only two
earlier) now which actually has me confused. If "Streams" are active only
on 3 nodes then how/why did a 4th node get work? If a 4th got work why
aren't more nodes getting work?






On Fri, Aug 29, 2014 at 4:11 PM, Tim Smith  wrote:

> I create my DStream very simply as:
> val kInMsg =
> KafkaUtils.createStream(ssc,"zkhost1:2181/zk_kafka","testApp",Map("rawunstruct"
> -> 8))
> .
> .
> eventually, before I operate on the DStream, I repartition it:
> kInMsg.repartition(512)
>
> Are you saying that ^^ repartition doesn't split by dstream into multiple
> smaller streams? Should I manually create multiple Dstreams like this?:
> val kInputs = (1 to 10).map {_=> KafkaUtils.createStream()}
>
> Then I apply some custom logic to it as:
> val outdata = kInMsg.map(x=>normalizeLog(x._2,configMap)) //where
> normalizeLog takes a String and Map of regex and returns a string
>
> In my case, I think I have traced the issue to the receiver executor being
> killed by Yarn:
> 14/08/29 22:46:30 ERROR YarnClientClusterScheduler: Lost executor 1 on
> node-dn1-4-acme.com: remote Akka client disassociated
>
> This be the root cause?
>
> http://apache-spark-developers-list.1001551.n3.nabble.com/Lost-executor-on-YARN-ALS-iterations-td7916.html
> https://issues.apache.org/jira/browse/SPARK-2121
>
>
>
>
>
> On Fri, Aug 29, 2014 at 3:28 PM, Sean Owen  wrote:
>
>> Are you using multiple Dstreams? repartitioning does not affect how
>> many receivers you have. It's on 2 nodes for each receiver. You need
>> multiple partitions in the queue, each consumed by a DStream, if you
>> mean to parallelize consuming the queue.
>>
>> On Fri, Aug 29, 2014 at 11:08 PM, Tim Smith  wrote:
>> > Good to see I am not the only one who cannot get incoming Dstreams to
>> > repartition. I tried repartition(512) but still no luck - the app
>> > stubbornly runs only on two nodes. Now this is 1.0.0 but looking at
>> > release notes for 1.0.1 and 1.0.2, I don't see anything that says this
>> > was an issue and has been fixed.
>> >
>> > How do I debug the repartition() statement to see what's the flow
>> > after the job hits that statement?
>> >
>> >
>> > On Fri, Aug 29, 2014 at 8:31 AM, bharatvenkat 
>> wrote:
>> >> Chris,
>> >>
>> >> I did the Dstream.repartition mentioned in the document on parallelism
>> in
>> >> receiving, as well as set "spark.default.parallelism" and it still
>> uses only
>> >> 2 nodes in my cluster.  I notice there is another email thread on the
>> same
>> >> topic:
>> >>
>> >>
>> http://apache-spark-user-list.1001560.n3.nabble.com/DStream-repartitioning-performance-tuning-processing-td13069.html
>> >>
>> >> My code is in Java and here is what I have:
>> >>
>> >>JavaPairReceiverInputDStream messages =
>> >>
>> >> KafkaUtils.createStream(ssc, zkQuorum,
>> >> "cse-job-play-consumer", kafkaTopicMap);
>> >>
>> >> JavaPairDStream newMessages =
>> >> messages.repartition(partitionSize);// partitionSize=30
>> >>
>> >> JavaDStream lines = newMessages.map(new
>> >> Function, String>() {
>> >> ...
>> >>
>> >> public String call(Tuple2 tuple2) {
>> >>   return tuple2._2();
>> >> }
>> >>   });
>> >>
>> >> JavaDStream words = lines.flatMap(new
>> >> MetricsComputeFunction()
>> >> );
>> >>
>> >> JavaPairDStream wordCounts = words.mapToPair(
>> >> new PairFunction() {
>> >>...
>> >> }
>> >> );
>> >>
>> >>  wordCounts.foreachRDD(new Function> Integer>,
>> >> Void>() {...});
>> >>
>> >> Thanks,
>> >> Bharat
>> >>
>> >>
>> >>
>> >> --
>> >> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p13131.html
>> >> Sent from the Apache Spark User List mailing list archive at
>> Nabble.com.
>> >>
>> >> -
>> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> >> For additional commands, e-mail: user-h...@spark.apache.org
>> >>
>> >
>> > -
>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> > For additional commands, e-mail: user-h...@spark.apache.org
>> >
>>
>
>


RE: [Streaming] Akka-based receiver with messages defined in uploaded jar

2014-08-29 Thread Anton Brazhnyk
Just checked it with 1.0.2
Still same exception.

From: Anton Brazhnyk [mailto:anton.brazh...@genesys.com]
Sent: Wednesday, August 27, 2014 6:46 PM
To: Tathagata Das
Cc: user@spark.apache.org
Subject: RE: [Streaming] Akka-based receiver with messages defined in uploaded 
jar

Sorry for the delay with answer – was on vacation.
As I said I was using modified version of launcher from the example. 
Modification is just about setting spark master URL in the code to not use 
run-example script.
The launcher itself was in the attached zip (attaching it once more) as 
ActorWordCount object.

From: Tathagata Das [mailto:tathagata.das1...@gmail.com]
Sent: Tuesday, August 05, 2014 11:32 PM
To: Anton Brazhnyk
Cc: user@spark.apache.org
Subject: Re: [Streaming] Akka-based receiver with messages defined in uploaded 
jar

How are you launching/submitting the program? Using spark-submit? Or some other 
script (can you provide that)?

TD

On Tue, Aug 5, 2014 at 6:52 PM, Anton Brazhnyk 
mailto:anton.brazh...@genesys.com>> wrote:
Went through it once again to leave the only modification in question. Still 
same exception.
I hope sources as zip file (instead of github) still can be tolerated. :)

Here is the stacktrace generated with this sources:
14/08/05 18:45:54 DEBUG RecurringTimer: Callback for BlockGenerator called at 
time 1407289554800
14/08/05 18:45:54 ERROR Remoting: 
org.apache.spark.examples.streaming.CustomMessage
java.lang.ClassNotFoundException: 
org.apache.spark.examples.streaming.CustomMessage
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:270)
at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:623)
at 
akka.util.ClassLoaderObjectInputStream.resolveClass(ClassLoaderObjectInputStream.scala:19)
at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1610)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1515)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1769)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at 
akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:136)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at akka.serialization.JavaSerializer.fromBinary(Serializer.scala:136)
at 
akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104)
at scala.util.Try$.apply(Try.scala:161)
at akka.serialization.Serialization.deserialize(Serialization.scala:98)
at 
akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:23)
at 
akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:55)
at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:55)
at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:73)
at 
akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:764)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
-Original Message-
From: Tathagata Das 
[mailto:tathagata.das1...@gmail.com]
Sent: Tuesday, August 05, 2014 5:42 PM
To: Anton Brazhnyk
Cc: user@spark.apache.org
Subject: Re: [Streaming] Akka-based receiver with messages defined in uploaded 
jar

 Can you show us the modified version. The reason could very well be what you 
suggest, but I want to understand what conditions lead to this.

TD

On Tue, Aug 5, 2014 at 3:55 PM, Anton Brazhnyk 
mailto:anton.brazh...@genesys.com>> wrote:
> Greetings,
>
>
>
> I modified ActorWordCount example a little and it uses simple case
> class as the message for Streaming instead of the prim

Re: [Streaming] Akka-based receiver with messages defined in uploaded jar

2014-08-29 Thread Tathagata Das
Can you try adding the JAR to the class path of the executors directly, by
setting the config "spark.executor.extraClassPath" in the SparkConf. See
Configuration page -
http://spark.apache.org/docs/latest/configuration.html#runtime-environment

I think what you guessed is correct. The Akka actor system is not aware of
the classes that are dynamically added when the custom jar is added with
setJar.

TD

On Fri, Aug 29, 2014 at 6:44 PM, Anton Brazhnyk 
wrote:

>  Just checked it with 1.0.2
>
> Still same exception.
>
>
>
> *From:* Anton Brazhnyk [mailto:anton.brazh...@genesys.com]
> *Sent:* Wednesday, August 27, 2014 6:46 PM
> *To:* Tathagata Das
> *Cc:* user@spark.apache.org
> *Subject:* RE: [Streaming] Akka-based receiver with messages defined in
> uploaded jar
>
>
>
> Sorry for the delay with answer – was on vacation.
>
> As I said I was using modified version of launcher from the example.
> Modification is just about setting spark master URL in the code to not use
> run-example script.
>
> The launcher itself was in the attached zip (attaching it once more) as
> ActorWordCount object.
>
>
>
> *From:* Tathagata Das [mailto:tathagata.das1...@gmail.com
> ]
> *Sent:* Tuesday, August 05, 2014 11:32 PM
> *To:* Anton Brazhnyk
> *Cc:* user@spark.apache.org
> *Subject:* Re: [Streaming] Akka-based receiver with messages defined in
> uploaded jar
>
>
>
> How are you launching/submitting the program? Using spark-submit? Or some
> other script (can you provide that)?
>
>
>
> TD
>
>
>
> On Tue, Aug 5, 2014 at 6:52 PM, Anton Brazhnyk 
> wrote:
>
> Went through it once again to leave the only modification in question.
> Still same exception.
> I hope sources as zip file (instead of github) still can be tolerated. :)
>
> Here is the stacktrace generated with this sources:
> 14/08/05 18:45:54 DEBUG RecurringTimer: Callback for BlockGenerator called
> at time 1407289554800
> 14/08/05 18:45:54 ERROR Remoting:
> org.apache.spark.examples.streaming.CustomMessage
> java.lang.ClassNotFoundException:
> org.apache.spark.examples.streaming.CustomMessage
>
> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:270)
> at
> java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:623)
> at
> akka.util.ClassLoaderObjectInputStream.resolveClass(ClassLoaderObjectInputStream.scala:19)
> at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1610)
> at
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1515)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1769)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> at
> akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:136)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> at
> akka.serialization.JavaSerializer.fromBinary(Serializer.scala:136)
> at
> akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104)
> at scala.util.Try$.apply(Try.scala:161)
> at
> akka.serialization.Serialization.deserialize(Serialization.scala:98)
> at
> akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:23)
> at
> akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:55)
> at
> akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:55)
> at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:73)
> at
> akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:764)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> -Original Message-
> From: Tathagata Das [mailto:tathagata.das1...@gmai

What does "appMasterRpcPort: -1" indicate ?

2014-08-29 Thread Tao Xiao
I'm using CDH 5.1.0, which bundles Spark 1.0.0 with it.

Following How-to: Run a Simple Apache Spark App in CDH 5

, I tried to submit my job in local mode, Spark Standalone mode and YARN
mode. I successfully submitted my job in local mode and Standalone mode,
however, I noticed the following messages printed on console when I
submitted my job in YARN mode:

14/08/29 22:27:29 INFO Client: Submitting application to ASM
14/08/29 22:27:29 INFO YarnClientImpl: Submitted application
application_1406949333981_0015
14/08/29 22:27:29 INFO YarnClientSchedulerBackend: Application report from
ASM:
appMasterRpcPort: -1
appStartTime: 1409365649836
yarnAppState: ACCEPTED
14/08/29 22:27:30 INFO YarnClientSchedulerBackend: Application report from
ASM:
appMasterRpcPort: -1
appStartTime: 1409365649836
yarnAppState: ACCEPTED
14/08/29 22:27:31 INFO YarnClientSchedulerBackend: Application report from
ASM:
appMasterRpcPort: -1
appStartTime: 1409365649836
yarnAppState: ACCEPTED
14/08/29 22:27:32 INFO YarnClientSchedulerBackend: Application report from
ASM:
appMasterRpcPort: -1
appStartTime: 1409365649836
yarnAppState: ACCEPTED
14/08/29 22:27:33 INFO YarnClientSchedulerBackend: Application report from
ASM:
appMasterRpcPort: -1
appStartTime: 1409365649836
yarnAppState: ACCEPTED
14/08/29 22:27:34 INFO YarnClientSchedulerBackend: Application report from
ASM:
appMasterRpcPort: -1
appStartTime: 1409365649836
yarnAppState: ACCEPTED
14/08/29 22:27:35 INFO YarnClientSchedulerBackend: Application report from
ASM:
appMasterRpcPort: -1
appStartTime: 1409365649836
yarnAppState: ACCEPTED
14/08/29 22:27:36 INFO YarnClientSchedulerBackend: Application report from
ASM:
appMasterRpcPort: -1
appStartTime: 1409365649836
yarnAppState: ACCEPTED
14/08/29 22:27:37 INFO YarnClientSchedulerBackend: Application report from
ASM:
appMasterRpcPort: -1
appStartTime: 1409365649836
yarnAppState: ACCEPTED
14/08/29 22:27:38 INFO YarnClientSchedulerBackend: Application report from
ASM:
appMasterRpcPort: -1
appStartTime: 1409365649836
yarnAppState: ACCEPTED
14/08/29 22:27:39 INFO YarnClientSchedulerBackend: Application report from
ASM:
appMasterRpcPort: 0
appStartTime: 1409365649836
yarnAppState: RUNNING

The job finished successfully and produced correct results.
But I'm not sure what those messages mean? Does "appMasterRpcPort: -1" indicate
an error or exception ?

Thanks


Re: Too many open files

2014-08-29 Thread Ye Xianjin
Ops,the last reply didn't go to the user list.  Mail app's fault.

Shuffling happens in the cluster, so you need change all the nodes in the 
cluster.



Sent from my iPhone

> On 2014年8月30日, at 3:10, Sudha Krishna  wrote:
> 
> Hi,
> 
> Thanks for your response. Do you know if I need to change this limit on all 
> the cluster nodes or just the master?
> Thanks
> 
>> On Aug 29, 2014 11:43 AM, "Ye Xianjin"  wrote:
>> 1024 for the number of file limit is most likely too small for Linux 
>> Machines on production. Try to set to 65536 or unlimited if you can. The too 
>> many open files error occurs because there are a lot of shuffle files(if 
>> wrong, please correct me):
>> 
>> Sent from my iPhone
>> 
>> > On 2014年8月30日, at 2:06, SK  wrote:
>> >
>> > Hi,
>> >
>> > I am having the same problem reported by Michael. I am trying to open 30
>> > files. ulimit -n  shows the limit is 1024. So I am not sure why the program
>> > is failing with  "Too many open files" error. The total size of all the 30
>> > files is 230 GB.
>> > I am running the job on a cluster with 10 nodes, each having 16 GB. The
>> > error appears to be happening at the distinct() stage.
>> >
>> > Here is my program. In the following code, are all the 10 nodes trying to
>> > open all of the 30 files or are the files distributed among the 30 nodes?
>> >
>> >val baseFile = "/mapr/mapr_dir/files_2013apr*"
>> >valx = sc.textFile(baseFile)).map { line =>
>> >val
>> > fields = line.split("\t")
>> >
>> > (fields(11), fields(6))
>> >
>> > }.distinct().countByKey()
>> >val xrdd = sc.parallelize(x.toSeq)
>> >xrdd.saveAsTextFile(...)
>> >
>> > Instead of using the glob *, I guess I can try using a for loop to read the
>> > files one by one if that helps, but not sure if there is a more efficient
>> > solution.
>> >
>> > The following is the error transcript:
>> >
>> > Job aborted due to stage failure: Task 1.0:201 failed 4 times, most recent
>> > failure: Exception failure in TID 902 on host 192.168.13.11:
>> > java.io.FileNotFoundException:
>> > /tmp/spark-local-20140829131200-0bb7/08/shuffle_0_201_999 (Too many open
>> > files)
>> > java.io.FileOutputStream.open(Native Method)
>> > java.io.FileOutputStream.(FileOutputStream.java:221)
>> > org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:116)
>> > org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:177)
>> > org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:161)
>> > org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:158)
>> > scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> > org.apache.spark.util.collection.AppendOnlyMap$$anon$1.foreach(AppendOnlyMap.scala:159)
>> > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
>> > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
>> > org.apache.spark.scheduler.Task.run(Task.scala:51)
>> > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
>> > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> > java.lang.Thread.run(Thread.java:744) Driver stacktrace:
>> >
>> >
>> >
>> >
>> >
>> > --
>> > View this message in context: 
>> > http://apache-spark-user-list.1001560.n3.nabble.com/Too-many-open-files-tp1464p13144.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> >
>> > -
>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> > For additional commands, e-mail: user-h...@spark.apache.org
>> >


SparkSQL HiveContext No Suitable Driver / Cannot Find Driver

2014-08-29 Thread Denny Lee
My issue is similar to the issue as noted 
http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201406.mbox/%3ccadoad2ks9_qgeign5-w7xogmrotrlbchvfukctgstj5qp9q...@mail.gmail.com%3E.

Currently using Spark-1.1 (grabbed from git two days ago) and using Hive 0.12 
with my metastore in MySQL.  If I run any HiveContext statements, it results in 
cannot find the driver in CLASSPATH error.  If I include it via —jars then it 
gives me the error “no suitable driver”.

Any ideas on how to get the Hive context to work here?

Thanks!
Denny



Re: SparkSQL HiveContext No Suitable Driver / Cannot Find Driver

2014-08-29 Thread Denny Lee
Oh, forgot to add the managed libraries and the Hive libraries within the 
CLASSPATH.  As soon as I did that, we’re good to go now.



On August 29, 2014 at 22:55:47, Denny Lee (denny.g@gmail.com) wrote:

My issue is similar to the issue as noted 
http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201406.mbox/%3ccadoad2ks9_qgeign5-w7xogmrotrlbchvfukctgstj5qp9q...@mail.gmail.com%3E.

Currently using Spark-1.1 (grabbed from git two days ago) and using Hive 0.12 
with my metastore in MySQL.  If I run any HiveContext statements, it results in 
cannot find the driver in CLASSPATH error.  If I include it via —jars then it 
gives me the error “no suitable driver”.

Any ideas on how to get the Hive context to work here?

Thanks!
Denny