Re: Research ideas using spark

2015-07-15 Thread Vineel Yalamarthy
Hi Daniel

Well said

Regards
Vineel

On Tue, Jul 14, 2015, 6:11 AM Daniel Darabos <
daniel.dara...@lynxanalytics.com> wrote:

> Hi Shahid,
> To be honest I think this question is better suited for Stack Overflow
> than for a PhD thesis.
>
> On Tue, Jul 14, 2015 at 7:42 AM, shahid ashraf  wrote:
>
>> hi
>>
>> I have a 10 node cluster  i loaded the data onto hdfs, so the no. of
>> partitions i get is 9. I am running a spark application , it gets stuck on
>> one of tasks, looking at the UI it seems application is not using all nodes
>> to do calculations. attached is the screen shot of tasks, it seems tasks
>> are put on each node more then once. looking at tasks 8 tasks get completed
>> under 7-8 minutes and one task takes around 30 minutes so causing the delay
>> in results.
>>
>>
>> On Tue, Jul 14, 2015 at 10:48 AM, Shashidhar Rao <
>> raoshashidhar...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I am doing my PHD thesis on large scale machine learning e.g  Online
>>> learning, batch and mini batch learning.
>>>
>>> Could somebody help me with ideas especially in the context of Spark and
>>> to the above learning methods.
>>>
>>> Some ideas like improvement to existing algorithms, implementing new
>>> features especially the above learning methods and algorithms that have not
>>> been implemented etc.
>>>
>>> If somebody could help me with some ideas it would really accelerate my
>>> work.
>>>
>>> Plus few ideas on research papers regarding Spark or Mahout.
>>>
>>> Thanks in advance.
>>>
>>> Regards
>>>
>>
>>
>>
>> --
>> with Regards
>> Shahid Ashraf
>>
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>
>


Re: Problem in Understanding concept of Physical Cores

2015-07-15 Thread Aniruddh Sharma
Hi TD,

Request your guidance on below 5 queries. Following is the context of them
that I would use to evaluate based on your response.

a) I need to decide whether to deploy Spark in Standalone mode or in Yarn.
But it seems to me that Spark in Yarn is more parallel than Standalone mode
(given same number of Physical cores) as it is possible to increase
execution threads in Yarn by --executor-cores method
b) Also need to understand following which is not clearly understandable.
Other persons in mailing list are also raising this query in another words
for different cases while doing tuning of jobs. Theoretically a JVM can
support thousands of threads. But in context of Spark what is advisable
usage of ratio of physical cores to ratio of threads to be created to ratio
of partitions to be created. If you find this relevant and important then
might be there could be better link to explain this both in Yarn and Stand
Alone mode.

Thanks and Regards
Aniruddh

On Fri, Jul 10, 2015 at 11:45 AM, Aniruddh Sharma 
wrote:

> Hi TD,
>
> Thanks for elaboration. I have  further doubts based on further test that
> I did after your guidance
>
> Case 1: Standalone Spark--
> In standalone mode, as you explained,master in spark-submit local[*]
> implicitly, so it uses as creates threads as the number of cores that VM
> has, but User can control the number of partitions which needs to be
> created and in accordance with number of partitions, tasks will be created.
>
> Query 1: If I have 4 cores, then 4 threads will be created but if I give
> 40 partitions to my data, than 40 tasks will be created which needs to be
> executed on 4 threads. Does it work this way, that 4 threads execute 4
> tasks (out of 40 in parallel) and when first set of task gets complete then
> they pick next 4 tasks and then they ask execute tasks in sequential
> manner. That is 4 tasks concurrent but rest of tasks in sequence when first
> concurrent set gets complete.
>
> Query 2: When we pass total-num-cores to Spark in StandAlone mode, then it
> seems number of threads do not increase. When I execute
> sc.defaultParallelism then it does not seem to take any effect on passed
> total-num-cores parameter. So when we use this parameter what does it
> exactly mean. Does it control number of threads or does it say to Spark
> Master to provide these many number of physical cores to this job. I mean
> is this parameter relevant not for a single job but if multiple jobs are
> running in cluster than to tell Spark Scheduler not to overallocate
> resources to a single job. Also setting this parameter, does it guarantee
> any behavior or is it only an indicator for Spark Scheduler.
>
>
> Case 2: Spark on Yarn
> In Spark on Yarn, it seems that threads which get created is not based on
> number of physical cores underlying.
>
> Query 3: But it seems to be (defaultMinPartition * executor-cores). Is
> this understanding correct. If yes then does it mean Developer has a
> control on number of threads to request to Spark by passing executor-core
> option (which was not there in Standalone mode as number of threads was
> based on number of physical cores). Is there a special reason for this kind
> of difference
>
> Query 4: Also it seems there is a restriction on value I can pass in
> executor-cores option which seems to be dependent on underlying physical
> cores. For example If I have 4 cores and I pass this value to be 20 then it
> works, but if I pass this value to be 100 then it does not work. So it
> seems actual number of threads which can be created inside JVM are still
> limited by number of physical cores but it can be controlled by
> executor-cores option. Kindly elaborate what is best practice to request
> how many threads based on physical cores and how physical cores limit this
> behavior.
>
> Query 5: Is there a reason for difference in behavior of total-num-cores
> (does not create a thread ) in Stand Alone mode and exectuor-cores( creates
> thread) in Yarn mode in how threads to be created. It seems in Yarn mode we
> can create more threads in same Executor JVM compated to Standalone mode
> for same number of physical cores.
>
> Thanks and Regards
> Aniruddh
>
>
>
>
> On Thu, Jul 9, 2015 at 4:30 PM, Tathagata Das  wrote:
>
>> Query 1) What spark runs is tasks in task slots, whatever is the mapping
>> ot tasks to physical cores it does not matter. If there are two task slots
>> (2 threads in local mode, or an executor with 2 task slots in distributed
>> mode), it can only run two tasks concurrently. That is true even if the
>> task is really not doing much. There is no multiplexing going on between
>> tasks and task slots. So to answer your query 1, there is 1 thread that is
>> permanently allocated to the receiver task (a long running task) even if it
>> does not do much. There is no thread left to process the data that is being
>> received.
>>
>> Query 2) I think this is already explained above. The receiver task is
>> taking the only available slot, le

spark sql - group by constant column

2015-07-15 Thread Lior Chaga
Hi,

Facing a bug with group by in SparkSQL (version 1.4).
Registered a JavaRDD with object containing integer fields as a table.

Then I'm trying to do a group by, with a constant value in the group by
fields:

SELECT primary_one, primary_two, 10 as num, SUM(measure) as total_measures
FROM tbl
GROUP BY primary_one, primary_two, num


I get the following exception:
org.apache.spark.sql.AnalysisException: cannot resolve 'num' given input
columns measure, primary_one, primary_two

Tried both with HiveContext and SqlContext.
The odd thing is that this kind of query actually works for me in a project
I'm working on, but I have there another bug (the group by does not yield
expected results).

The only reason I can think of is that maybe in my real project, the
context configuration is different.
In my above example the configuration of the HiveContext is empty.

In my real project, the configuration is shown below.
Any ideas?

Thanks,
Lior

Hive context configuration in project:
"(mapreduce.jobtracker.jobhistory.task.numberprogresssplits,12)"
"(nfs3.mountd.port,4242)"
"(mapreduce.tasktracker.healthchecker.script.timeout,60)"
"(yarn.app.mapreduce.am.scheduler.heartbeat.interval-ms,1000)"
"(mapreduce.input.fileinputformat.input.dir.recursive,false)"
"(hive.orc.compute.splits.num.threads,10)"
"(mapreduce.job.classloader.system.classes,java.,javax.,org.apache.commons.logging.,org.apache.log4j.,org.apache.hadoop.)"
"(hive.auto.convert.sortmerge.join.to.mapjoin,false)"
"(hadoop.http.authentication.kerberos.principal,HTTP/_HOST@LOCALHOST)"
"(hive.exec.perf.logger,org.apache.hadoop.hive.ql.log.PerfLogger)"
 "(hive.mapjoin.lazy.hashtable,true)"
 "(mapreduce.framework.name,local)"
 "(hive.exec.script.maxerrsize,10)"
 "(dfs.namenode.checkpoint.txns,100)"
 "(tfile.fs.output.buffer.size,262144)"
 "(yarn.app.mapreduce.am.job.task.listener.thread-count,30)"
 "(mapreduce.tasktracker.local.dir.minspacekill,0)"
 "(hive.support.concurrency,false)"
 "(fs.s3.block.size,67108864)"
 "(hive.script.recordwriter,org.apache.hadoop.hive.ql.exec.TextRecordWriter)"
 "(hive.stats.retries.max,0)"
 "(hadoop.hdfs.configuration.version,1)"
 "(dfs.bytes-per-checksum,512)"
 "(fs.s3.buffer.dir,${hadoop.tmp.dir}/s3)"
 "(mapreduce.job.acl-view-job, )"
 "(hive.typecheck.on.insert,true)"
 "(mapreduce.jobhistory.loadedjobs.cache.size,5)"
 "(mapreduce.jobtracker.persist.jobstatus.hours,1)"
 "(hive.unlock.numretries,10)"
 "(dfs.namenode.handler.count,10)"
 "(mapreduce.input.fileinputformat.split.minsize,1)"
 "(hive.plan.serialization.format,kryo)"
 "(dfs.datanode.failed.volumes.tolerated,0)"
 "(yarn.resourcemanager.container.liveness-monitor.interval-ms,60)"
 "(yarn.resourcemanager.amliveliness-monitor.interval-ms,1000)"
 "(yarn.resourcemanager.client.thread-count,50)"
 "(io.seqfile.compress.blocksize,100)"
 "(mapreduce.tasktracker.http.threads,40)"
 "(hive.explain.dependency.append.tasktype,false)"
 "(dfs.namenode.retrycache.expirytime.millis,60)"
 "(dfs.namenode.backup.address,0.0.0.0:50100)"
 "(hive.hwi.listen.host,0.0.0.0)"
 "(dfs.datanode.data.dir,file://${hadoop.tmp.dir}/dfs/data)"
 "(dfs.replication,3)"
 "(mapreduce.jobtracker.jobhistory.block.size,3145728)"
 
"(dfs.secondary.namenode.kerberos.internal.spnego.principal,${dfs.web.authentication.kerberos.principal})"
 "(mapreduce.task.profile.maps,0-2)"
 "(fs.har.impl,org.apache.hadoop.hive.shims.HiveHarFileSystem)"
 "(hive.stats.reliable,false)"
 "(yarn.nodemanager.admin-env,MALLOC_ARENA_MAX=$MALLOC_ARENA_MAX)"


Re: Java 8 vs Scala

2015-07-15 Thread 诺铁
I think different team got different answer for this question.  my team use
scala, and happy with it.

On Wed, Jul 15, 2015 at 1:31 PM, Tristan Blakers 
wrote:

> We have had excellent results operating on RDDs using Java 8 with Lambdas.
> It’s slightly more verbose than Scala, but I haven’t found this an issue,
> and haven’t missed any functionality.
>
> The new DataFrame API makes the Spark platform even more language agnostic.
>
> Tristan
>
> On 15 July 2015 at 06:40, Vineel Yalamarthy 
> wrote:
>
>>  Good   question. Like  you , many are in the same boat(coming from Java
>> background). Looking forward to response from the community.
>>
>> Regards
>> Vineel
>>
>> On Tue, Jul 14, 2015 at 2:30 PM, spark user > > wrote:
>>
>>> Hi All
>>>
>>> To Start new project in Spark , which technology is good .Java8 OR
>>>  Scala .
>>>
>>> I am Java developer , Can i start with Java 8  or I Need to learn Scala .
>>>
>>> which one is better technology  for quick start any POC project
>>>
>>> Thanks
>>>
>>> - su
>>>
>>
>>
>>
>> --
>>
>> Thanks and Regards,
>> Venkata Vineel, Student  ,School of Computing
>> Mobile : +1-385-2109-788
>>
>> -*Innovation is the ability to convert **ideas into invoices*
>>
>>
>


Random Forest Error

2015-07-15 Thread rishikesh
Hi

I am trying to train a Random Forest over my dataset. I have a binary
classification problem. When I call the train method as below

model = RandomForest.trainClassifier(data, numClasses=2,
categoricalFeaturesInfo={},numTrees=3, featureSubsetStrategy="auto",
impurity='gini maxDepth=4, maxBins=32)

I get the error

15/07/15 16:24:28 ERROR Executor: Exception in task 1.0 in stage 95.0 (TID
145)
java.lang.IllegalArgumentException: GiniAggregator given label 2.0 but
requires label < numClasses (= 2).

What argument am I missing or not specifying correctly.

Thanks
Rishi



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Random-Forest-Error-tp23847.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Java 8 vs Scala

2015-07-15 Thread Ignacio Blasco
The main advantage of using scala vs java 8 is being able to use a console

2015-07-15 9:27 GMT+02:00 诺铁 :

> I think different team got different answer for this question.  my team
> use scala, and happy with it.
>
> On Wed, Jul 15, 2015 at 1:31 PM, Tristan Blakers 
> wrote:
>
>> We have had excellent results operating on RDDs using Java 8 with
>> Lambdas. It’s slightly more verbose than Scala, but I haven’t found this an
>> issue, and haven’t missed any functionality.
>>
>> The new DataFrame API makes the Spark platform even more language
>> agnostic.
>>
>> Tristan
>>
>> On 15 July 2015 at 06:40, Vineel Yalamarthy 
>> wrote:
>>
>>>  Good   question. Like  you , many are in the same boat(coming from Java
>>> background). Looking forward to response from the community.
>>>
>>> Regards
>>> Vineel
>>>
>>> On Tue, Jul 14, 2015 at 2:30 PM, spark user <
>>> spark_u...@yahoo.com.invalid> wrote:
>>>
 Hi All

 To Start new project in Spark , which technology is good .Java8 OR
  Scala .

 I am Java developer , Can i start with Java 8  or I Need to learn Scala
 .

 which one is better technology  for quick start any POC project

 Thanks

 - su

>>>
>>>
>>>
>>> --
>>>
>>> Thanks and Regards,
>>> Venkata Vineel, Student  ,School of Computing
>>> Mobile : +1-385-2109-788
>>>
>>> -*Innovation is the ability to convert **ideas into invoices*
>>>
>>>
>>
>


Re: Random Forest Error

2015-07-15 Thread Anas Sherwani
For RandomForest classifier, labels should be within the range
[0,numClasses-1]. This means, you have to map your labels to 0,1 instead of
1,2.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Random-Forest-Error-tp23847p23848.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: creating a distributed index

2015-07-15 Thread Jem Tucker
With regards to Indexed structures in Spark are there any alternatives to
IndexedRDD for more generic keys including Strings?

Thanks

Jem

On Wed, Jul 15, 2015 at 7:41 AM Burak Yavuz  wrote:

> Hi Swetha,
>
> IndexedRDD is available as a package on Spark Packages
> .
>
> Best,
> Burak
>
> On Tue, Jul 14, 2015 at 5:23 PM, swetha  wrote:
>
>> Hi Ankur,
>>
>> Is IndexedRDD available in Spark 1.4.0? We would like to use this in Spark
>> Streaming to do lookups/updates/deletes in RDDs using keys by storing them
>> as key/value pairs.
>>
>> Thanks,
>> Swetha
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/creating-a-distributed-index-tp11204p23842.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: spark sql - group by constant column

2015-07-15 Thread Lior Chaga
I found out the problem. Grouping by a constant column value is indeed
impossible.
The reason it was "working" in my project is that I gave the constant
column an alias that exists in the schema of the dataframe. The dataframe
contained a "data_timestamp" representing an hour, and I added to the
select a constant "data_timestamp" that represented the timestamp of the
day. And that was the cause for my original bug - I thought I was grouping
by the day timestamp, when I was actually grouping by each hour, and
therefore I got multiple rows for each of the group by combinations.

On Wed, Jul 15, 2015 at 10:09 AM, Lior Chaga  wrote:

> Hi,
>
> Facing a bug with group by in SparkSQL (version 1.4).
> Registered a JavaRDD with object containing integer fields as a table.
>
> Then I'm trying to do a group by, with a constant value in the group by
> fields:
>
> SELECT primary_one, primary_two, 10 as num, SUM(measure) as total_measures
> FROM tbl
> GROUP BY primary_one, primary_two, num
>
>
> I get the following exception:
> org.apache.spark.sql.AnalysisException: cannot resolve 'num' given input
> columns measure, primary_one, primary_two
>
> Tried both with HiveContext and SqlContext.
> The odd thing is that this kind of query actually works for me in a
> project I'm working on, but I have there another bug (the group by does not
> yield expected results).
>
> The only reason I can think of is that maybe in my real project, the
> context configuration is different.
> In my above example the configuration of the HiveContext is empty.
>
> In my real project, the configuration is shown below.
> Any ideas?
>
> Thanks,
> Lior
>
> Hive context configuration in project:
> "(mapreduce.jobtracker.jobhistory.task.numberprogresssplits,12)"
> "(nfs3.mountd.port,4242)"
> "(mapreduce.tasktracker.healthchecker.script.timeout,60)"
> "(yarn.app.mapreduce.am.scheduler.heartbeat.interval-ms,1000)"
> "(mapreduce.input.fileinputformat.input.dir.recursive,false)"
> "(hive.orc.compute.splits.num.threads,10)"
>
> "(mapreduce.job.classloader.system.classes,java.,javax.,org.apache.commons.logging.,org.apache.log4j.,org.apache.hadoop.)"
> "(hive.auto.convert.sortmerge.join.to.mapjoin,false)"
> "(hadoop.http.authentication.kerberos.principal,HTTP/_HOST@LOCALHOST)"
> "(hive.exec.perf.logger,org.apache.hadoop.hive.ql.log.PerfLogger)"
>  "(hive.mapjoin.lazy.hashtable,true)"
>  "(mapreduce.framework.name,local)"
>  "(hive.exec.script.maxerrsize,10)"
>  "(dfs.namenode.checkpoint.txns,100)"
>  "(tfile.fs.output.buffer.size,262144)"
>  "(yarn.app.mapreduce.am.job.task.listener.thread-count,30)"
>  "(mapreduce.tasktracker.local.dir.minspacekill,0)"
>  "(hive.support.concurrency,false)"
>  "(fs.s3.block.size,67108864)"
>
>  "(hive.script.recordwriter,org.apache.hadoop.hive.ql.exec.TextRecordWriter)"
>  "(hive.stats.retries.max,0)"
>  "(hadoop.hdfs.configuration.version,1)"
>  "(dfs.bytes-per-checksum,512)"
>  "(fs.s3.buffer.dir,${hadoop.tmp.dir}/s3)"
>  "(mapreduce.job.acl-view-job, )"
>  "(hive.typecheck.on.insert,true)"
>  "(mapreduce.jobhistory.loadedjobs.cache.size,5)"
>  "(mapreduce.jobtracker.persist.jobstatus.hours,1)"
>  "(hive.unlock.numretries,10)"
>  "(dfs.namenode.handler.count,10)"
>  "(mapreduce.input.fileinputformat.split.minsize,1)"
>  "(hive.plan.serialization.format,kryo)"
>  "(dfs.datanode.failed.volumes.tolerated,0)"
>  "(yarn.resourcemanager.container.liveness-monitor.interval-ms,60)"
>  "(yarn.resourcemanager.amliveliness-monitor.interval-ms,1000)"
>  "(yarn.resourcemanager.client.thread-count,50)"
>  "(io.seqfile.compress.blocksize,100)"
>  "(mapreduce.tasktracker.http.threads,40)"
>  "(hive.explain.dependency.append.tasktype,false)"
>  "(dfs.namenode.retrycache.expirytime.millis,60)"
>  "(dfs.namenode.backup.address,0.0.0.0:50100)"
>  "(hive.hwi.listen.host,0.0.0.0)"
>  "(dfs.datanode.data.dir,file://${hadoop.tmp.dir}/dfs/data)"
>  "(dfs.replication,3)"
>  "(mapreduce.jobtracker.jobhistory.block.size,3145728)"
>
>  
> "(dfs.secondary.namenode.kerberos.internal.spnego.principal,${dfs.web.authentication.kerberos.principal})"
>  "(mapreduce.task.profile.maps,0-2)"
>  "(fs.har.impl,org.apache.hadoop.hive.shims.HiveHarFileSystem)"
>  "(hive.stats.reliable,false)"
>  "(yarn.nodemanager.admin-env,MALLOC_ARENA_MAX=$MALLOC_ARENA_MAX)"
>
>


[SparkR] creating dataframe from json file

2015-07-15 Thread jianshu
hi all, 

Not sure whether this the right venue to ask. If not, please point me to the
right group, if there is any. 

I'm trying to create a Spark DataFrame from JSON file using jsonFile(). The
call was successful, and I can see the DataFrame created. The JSON file I
have contains a number of tweets obtained from Twitter API. Am particularly
interested in pulling the hashtags contains in the tweets. If I use
printSchema(), the schema is something like: 

root
 |-- id_str: string (nullable = true)
 |-- hashtags: array (nullable = true)
 ||-- element: struct (containsNull = true)
 |||-- indices: array (nullable = true)
 ||||-- element: long (containsNull = true)
 |||-- text: string (nullable = true)

showDF() would show something like this :

++
|hashtags|
++
|  List()|
|List([List(125, 1...|
|  List()|
|List([List(0, 3),...|
|List([List(76, 86...|
|  List()|
|List([List(74, 84...|
|  List()|
|  List()|
|  List()|
|List([List(85, 96...|
|List([List(125, 1...|
|  List()|
|  List()|
|  List()|
|  List()|
|List([List(14, 17...|
|  List()|
|  List()|
|List([List(14, 17...|
++

The question is now how to extract the text of the hashtags for each tweet?
Still new to SparkR. Am thinking maybe I need to loop through the dataframe
to extract for each tweet. But it seems that lapply does not really apply on
Spark DataFrame as more. Any though on how to extract the text, as it will
be inside a JSON array. 


Thanks, 


-JS




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-creating-dataframe-from-json-file-tp23849.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Random Forest Error

2015-07-15 Thread rishikesh
Thanks, that fixed the problem.

Cheers
Rishi



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Random-Forest-Error-tp23847p23850.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Java 8 vs Scala

2015-07-15 Thread Reinis Vicups
We have a complex application that runs productively for couple of 
months and heavily uses spark in scala.


Just to give you some insight on complexity - we do not have such a huge 
source data (only about 500'000 complex elements), but we have more than 
a billion transformations and intermediate data elements we do with our 
machine learning algorithms.
Our current spark/mesos cluster consists of 120 CPUs, 190 GB RAM and 
plenty of HDD space.


Now regarding your question:

- scala is just a beautiful language itself, it has nothing to do with 
spark;


- spark api fits very naturally into scala semantics because of the 
map/reduce transformations are written more or less identicaly for local 
collections and RDDs;


- as with any religious topic, there is controverse discussion on what 
language is better and most of the times (I have read quite a lot of 
blog/forum topics on this) argumentation is based on what religion one 
belongs to (e.g. Java vs Scala vs Python)


- we have checked supposed performance issues and limitations of scala 
described here: (http://www.infoq.com/news/2011/11/yammer-scala) by 
re-factoring to "best practices" described in the article and have 
observed both performance increase in some places and, at the same time, 
performance decrease in other places. Thus I would say there is no 
noticeable performance difference between scala vs java in our use case 
(of course there are and always will be applications where one or other 
language performs better);


hope I could help
reinis


On 15.07.2015 09:27, 诺铁 wrote:
I think different team got different answer for this question.  my 
team use scala, and happy with it.


On Wed, Jul 15, 2015 at 1:31 PM, Tristan Blakers 
mailto:tris...@blackfrog.org>> wrote:


We have had excellent results operating on RDDs using Java 8 with
Lambdas. It’s slightly more verbose than Scala, but I haven’t
found this an issue, and haven’t missed any functionality.

The new DataFrame API makes the Spark platform even more language
agnostic.

Tristan

On 15 July 2015 at 06:40, Vineel Yalamarthy
mailto:vineelyalamar...@gmail.com>>
wrote:

 Good   question. Like  you , many are in the same boat(coming
from Java background). Looking forward to response from the
community.

Regards
Vineel

On Tue, Jul 14, 2015 at 2:30 PM, spark user
mailto:spark_u...@yahoo.com.invalid>> wrote:

Hi All

To Start new project in Spark , which technology is good
.Java8 OR  Scala .

I am Java developer , Can i start with Java 8  or I Need
to learn Scala .

which one is better technology  for quick start any POC
project

Thanks

- su




-- 


Thanks and Regards,
Venkata Vineel, Student  ,School of Computing
Mobile : +1-385-2109-788

-/Innovation is the ability to convert //ideas into invoice*s*/







Re: creating a distributed index

2015-07-15 Thread Ankur Dave
The latest version of IndexedRDD supports any key type with a defined
serializer
,
including Strings. It's not released yet, but you can use it from the
master branch if you're interested.

Ankur 

On Wed, Jul 15, 2015 at 12:43 AM, Jem Tucker  wrote:

> With regards to Indexed structures in Spark are there any alternatives to
> IndexedRDD for more generic keys including Strings?
>
> Thanks
>
> Jem
>


DataFrame InsertIntoJdbc() Runtime Exception on cluster

2015-07-15 Thread Manohar753
Hi All,

Am trying to add few new rows for existing table in mysql using
DataFrame.But it is adding new rows to the table in local environment but on
spark cluster below is the runtime exception.


Exception in thread "main" java.lang.RuntimeException: Table msusers_1
already exists.
at scala.sys.package$.error(package.scala:27)
at
org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:240)
at
org.apache.spark.sql.DataFrame.insertIntoJDBC(DataFrame.scala:1481)
at com.sparkexpert.UserMigration.main(UserMigration.java:59)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
at
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
15/07/15 08:13:42 INFO spark.SparkContext: Invoking stop() from shutdown
hook
15/07/15 08:13:42 INFO handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/metrics/json,null}
15/07/15 08:13:

code snippet is below:

System.out.println(Query);
Map options = new HashMap<>();
options.put("driver",
PropertyLoader.getProperty(Constants.msSqlDriver));
options.put("url", PropertyLoader.getProperty(Constants.msSqlURL));
options.put("dbtable",Query);   
options.put("numPartitions", "1");
DataFrame delatUsers = sqlContext.load("jdbc", options);


delatUsers.show();
//Load latest users DataFrame

String mysQuery="(SELECT * FROM msusers_1) as employees_name";
Map msoptions = new HashMap<>();
   
msoptions.put("driver",PropertyLoader.getProperty(Constants.mysqlDriver));
msoptions.put("url",
PropertyLoader.getProperty(Constants.mysqlUrl));
msoptions.put("dbtable",mysQuery);   
msoptions.put("numPartitions", "1");
DataFrame latestUsers = sqlContext.load("jdbc", msoptions); 

//Get Update users Data
DataFrame updatedUsers =   
delatUsers.as("ms").join(latestUsers.as("lat"),
col("lat.uid").equalTo(col("ms.uid")),
"inner").select("ms.revision","ms.uid","ms.UserType","ms.FirstName","ms.LastName","ms.Email","ms.smsuser_id","ms.dev_acct","ms.lastlogin","ms.username","ms.schoolAffiliation","ms.authsystem_id","ms.AdminStatus");
 //Insert new users into Mysql DB
*   
delatUsers.except(updatedUsers).insertIntoJDBC(PropertyLoader.getProperty(Constants.mysqlUrl),
"msusers_1", false);
*
 the bold line is the Exception occur line.
Team please give me some inputs if any one had come across this .
but for the same override the table is working fine on cluster also.

Thanks,
manoar



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/DataFrame-InsertIntoJdbc-Runtime-Exception-on-cluster-tp23851.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark Stream suitability

2015-07-15 Thread polariz
Hi,

I am am evaluating my options for a project that injects a rich data feed,
does some aggregate calculations and allows the user to query on these. 

The (protobuf) data feed is rich in the sense that it contains several data
fields which can be used to calculate several different KPI figures. The
KPIs are not related. 

I would like to explore the possibility of doing this work as data comes in
using Spark Streaming. Any examples I've seen and my gut tells me that the
Spark Stream apps should be kept simple.. one data metric is processed in
one "pipeline" and persisted at the end. In my case I would need to ingest
the rich data and fork into several pipelines, each calculating a different
KPI and then persist them all at the end as one transaction.  

Am I right in thinking that this complexity and aggregation work would be
better placed in separate offline Spark jobs?

Any feedback would be much appreciated, thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Stream-suitability-tp23852.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Strange behavior of CoalescedRDD

2015-07-15 Thread Konstantin Knizhnik
Looks like the source of the problem is in SqlNewHad\oopRDD.compute method:



Created Parquet file reader is intended to be closed at task completion
time.
This reader contains a lot of references to  parquet.bytes.BytesInput object
which in turn contains reference sot large byte arrays (some of them are
several megabytes).
As far as in case of CoalescedRDD task is completed only after processing
larger number of parquet files, it cause file handles exhaustion and memory
overflow.

Unfortunately I didn't find any way to force cleanup of file readers.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Strange-behavior-of-CoalescedRDD-tp23819p23853.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: creating a distributed index

2015-07-15 Thread Jem Tucker
This is very interesting, do you know if this version will be backwards
compatible with older versions of Spark (1.2.0)?

Thanks,

Jem

On Wed, Jul 15, 2015 at 10:04 AM Ankur Dave  wrote:

> The latest version of IndexedRDD supports any key type with a defined
> serializer
> ,
> including Strings. It's not released yet, but you can use it from the
> master branch if you're interested.
>
> Ankur 
>
> On Wed, Jul 15, 2015 at 12:43 AM, Jem Tucker  wrote:
>
>> With regards to Indexed structures in Spark are there any alternatives to
>> IndexedRDD for more generic keys including Strings?
>>
>> Thanks
>>
>> Jem
>>
>


what is metadata in StructField ?

2015-07-15 Thread matd
I see in StructField that we can provide metadata.

What is it meant for ?  How is it used by Spark later on ?
Are there any rules on what we can/cannot do with it ?

I'm building some DataFrame processing, and I need to maintain a set of
(meta)data along with the DF. I was wondering if I can use
StructField.metadata for this use, or if I should build my own structure.

Mathieu



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/what-is-metadata-in-StructField-tp23854.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Java 8 vs Scala

2015-07-15 Thread Gourav Sengupta
Why would you create a class and then instantiate it to store data and
change the class every time you have to add a new element? In OOPS
terminology a class represents an object, and an object has states - does
it not?

Purely from a data warehousing perspective - one of the fundamental
principles in delivering a DW system is to ensure a Single Version of Truth
and that is what a Functional way of thinking naturally supports.

We can say by extension that data analytics algorithms are quite in tune
with functional way of thinking and therefore Scala, whereas object
oriented way of thinking needs to adapt itself to be functional. Of course
we can use OOPS concept for delivering data solutions just like we can
implement OOPS concept in C.

Java is good for solving certain things which require OOPS rigor and Scala
mostly in problems that can use functional way of problem solving - purely
from a data processing perspective.

Those who are using performance timings to compare these two languages
should start coding in Machine Level Language and then see the performance
gains in terms of Java and MLL and should switch over to MLL. Of course MLL
is a bit more verbose than Java just as Java is a bit more verbose than
Scala and Python - but who's complaining.

Of course, these are my personal thoughts and I may be completely wrong and
will be grateful if someone could illustrate how.


Regards,
Gourav


On Wed, Jul 15, 2015 at 10:03 AM, Reinis Vicups  wrote:

>  We have a complex application that runs productively for couple of
> months and heavily uses spark in scala.
>
> Just to give you some insight on complexity - we do not have such a huge
> source data (only about 500'000 complex elements), but we have more than a
> billion transformations and intermediate data elements we do with our
> machine learning algorithms.
> Our current spark/mesos cluster consists of 120 CPUs, 190 GB RAM and
> plenty of HDD space.
>
> Now regarding your question:
>
> - scala is just a beautiful language itself, it has nothing to do with
> spark;
>
> - spark api fits very naturally into scala semantics because of the
> map/reduce transformations are written more or less identicaly for local
> collections and RDDs;
>
> - as with any religious topic, there is controverse discussion on what
> language is better and most of the times (I have read quite a lot of
> blog/forum topics on this) argumentation is based on what religion one
> belongs to (e.g. Java vs Scala vs Python)
>
> - we have checked supposed performance issues and limitations of scala
> described here: (http://www.infoq.com/news/2011/11/yammer-scala) by
> re-factoring to "best practices" described in the article and have observed
> both performance increase in some places and, at the same time, performance
> decrease in other places. Thus I would say there is no noticeable
> performance difference between scala vs java in our use case (of course
> there are and always will be applications where one or other language
> performs better);
>
> hope I could help
> reinis
>
>
>
> On 15.07.2015 09:27, 诺铁 wrote:
>
> I think different team got different answer for this question.  my team
> use scala, and happy with it.
>
> On Wed, Jul 15, 2015 at 1:31 PM, Tristan Blakers 
> wrote:
>
>> We have had excellent results operating on RDDs using Java 8 with
>> Lambdas. It’s slightly more verbose than Scala, but I haven’t found this an
>> issue, and haven’t missed any functionality.
>>
>>  The new DataFrame API makes the Spark platform even more language
>> agnostic.
>>
>>  Tristan
>>
>> On 15 July 2015 at 06:40, Vineel Yalamarthy 
>> wrote:
>>
>>>   Good   question. Like  you , many are in the same boat(coming from
>>> Java background). Looking forward to response from the community.
>>>
>>>  Regards
>>>  Vineel
>>>
>>> On Tue, Jul 14, 2015 at 2:30 PM, spark user <
>>> spark_u...@yahoo.com.invalid> wrote:
>>>
  Hi All

  To Start new project in Spark , which technology is good .Java8 OR
  Scala .

  I am Java developer , Can i start with Java 8  or I Need to learn
 Scala .

  which one is better technology  for quick start any POC project

  Thanks

  - su

>>>
>>>
>>>
>>> --
>>>
>>>  Thanks and Regards,
>>> Venkata Vineel, Student  ,School of Computing
>>>  Mobile : +1-385-2109-788
>>>
>>>  -*Innovation is the ability to convert **ideas into invoices*
>>>
>>>
>>
>
>


Re: spark-submit can not resolve spark-hive_2.10

2015-07-15 Thread Hao Ren
Thanks for the reply.

Actually, I don't think excluding spark-hive from spark-submit --packages
is a good idea.

I don't want to recompile spark by assembly for my cluster, every time a
new spark release is out.

I prefer using binary version of spark and then adding some jars for job
execution. e.g. Add spark-hive for HiveContext usage.

FYI, spark-hive is just 1.2MB:
http://mvnrepository.com/artifact/org.apache.spark/spark-hive_2.10/1.4.0

On Wed, Jul 8, 2015 at 2:03 AM, Burak Yavuz  wrote:

> spark-hive is excluded when using --packages, because it can be included
> in the spark-assembly by adding -Phive during mvn package or sbt assembly.
>
> Best,
> Burak
>
> On Tue, Jul 7, 2015 at 8:06 AM, Hao Ren  wrote:
>
>> I want to add spark-hive as a dependence to submit my job, but it seems
>> that
>> spark-submit can not resolve it.
>>
>> $ ./bin/spark-submit \
>> → --packages
>>
>> org.apache.spark:spark-hive_2.10:1.4.0,org.postgresql:postgresql:9.3-1103-jdbc3,joda-time:joda-time:2.8.1
>> \
>> → --class fr.leboncoin.etl.jobs.dwh.AdStateTraceDWHTransform \
>> → --master spark://localhost:7077 \
>>
>> Ivy Default Cache set to: /home/invkrh/.ivy2/cache
>> The jars for the packages stored in: /home/invkrh/.ivy2/jars
>> https://repository.jboss.org/nexus/content/repositories/releases/ added
>> as a
>> remote repository with the name: repo-1
>> :: loading settings :: url =
>>
>> jar:file:/home/invkrh/workspace/scala/spark/assembly/target/scala-2.10/spark-assembly-1.4.0-SNAPSHOT-hadoop2.2.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
>> org.apache.spark#spark-hive_2.10 added as a dependency
>> org.postgresql#postgresql added as a dependency
>> joda-time#joda-time added as a dependency
>> :: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
>> confs: [default]
>> found org.postgresql#postgresql;9.3-1103-jdbc3 in local-m2-cache
>> found joda-time#joda-time;2.8.1 in central
>> :: resolution report :: resolve 139ms :: artifacts dl 3ms
>> :: modules in use:
>> joda-time#joda-time;2.8.1 from central in [default]
>> org.postgresql#postgresql;9.3-1103-jdbc3 from local-m2-cache in
>> [default]
>>
>> -
>> |  |modules||
>>  artifacts   |
>> |   conf   | number| search|dwnlded|evicted||
>> number|dwnlded|
>>
>> -
>> |  default |   2   |   0   |   0   |   0   ||   2   |
>>  0   |
>>
>> -
>> :: retrieving :: org.apache.spark#spark-submit-parent
>> confs: [default]
>> 0 artifacts copied, 2 already retrieved (0kB/6ms)
>> Exception in thread "main" java.lang.NoClassDefFoundError:
>> org/apache/spark/sql/hive/HiveContext
>> at java.lang.Class.forName0(Native Method)
>> at java.lang.Class.forName(Class.java:348)
>> at
>>
>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:633)
>> at
>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
>> at
>> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
>> at
>> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>> Caused by: java.lang.ClassNotFoundException:
>> org.apache.spark.sql.hive.HiveContext
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>> ... 7 more
>> Using Spark's default log4j profile:
>> org/apache/spark/log4j-defaults.properties
>> 15/07/07 16:57:59 INFO Utils: Shutdown hook called
>>
>> Any help is appreciated. Thank you.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/spark-submit-can-not-resolve-spark-hive-2-10-tp23695.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


-- 
Hao Ren

Data Engineer @ leboncoin

Paris, France


Re: what is metadata in StructField ?

2015-07-15 Thread Peter Rudenko

Hi Mathieu,
metadata is very usefull if you need to save some data about a column 
(e.g. count of null values, cardinality, domain, min/max/std, etc.). 
It's currently used in ml package in attributes: 
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/attribute/attributes.scala


Take a look how i'm using metadata to get summary statistics from h2o: 
https://github.com/h2oai/sparkling-water/pull/17/files


Let me know if you'll have questions.

Thanks,
Peter Rudenko

On 2015-07-15 12:48, matd wrote:

I see in StructField that we can provide metadata.

What is it meant for ?  How is it used by Spark later on ?
Are there any rules on what we can/cannot do with it ?

I'm building some DataFrame processing, and I need to maintain a set of
(meta)data along with the DF. I was wondering if I can use
StructField.metadata for this use, or if I should build my own structure.

Mathieu



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/what-is-metadata-in-StructField-tp23854.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



DataFrame.write().partitionBy("some_column").parquet(path) produces OutOfMemory with very few items

2015-07-15 Thread Nikos Viorres
Hi,

I am trying to test partitioning for DataFrames with parquet usage so i
attempted to do df.write().partitionBy("some_column").parquet(path) on a
small dataset of 20.000 records which when saved as parquet locally with
gzip take 4mb of disk space.
However, on my dev machine with
-Dspark.master=local[4] -Dspark.executor.memory=2g -Xmx10g this always
fails with an OutOfMemoryError.
Does anyone have any ideas?

stack trace:
[Stage 2:>  (0 + 4)
/ 8]2015-07-15 13:57:21,021 ERROR Logging$class Exception in task 3.0 in
stage 2.0 (TID 8)
java.lang.OutOfMemoryError: Java heap space
at
parquet.bytes.CapacityByteArrayOutputStream.initSlabs(CapacityByteArrayOutputStream.java:65)
at
parquet.bytes.CapacityByteArrayOutputStream.(CapacityByteArrayOutputStream.java:57)
at
parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.(ColumnChunkPageWriteStore.java:68)
at
parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.(ColumnChunkPageWriteStore.java:48)
at
parquet.hadoop.ColumnChunkPageWriteStore.getPageWriter(ColumnChunkPageWriteStore.java:215)
at
parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:67)
at
parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56)
at
parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.(MessageColumnIO.java:178)
at parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:369)
at
parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:108)
at
parquet.hadoop.InternalParquetRecordWriter.(InternalParquetRecordWriter.java:94)
at parquet.hadoop.ParquetRecordWriter.(ParquetRecordWriter.java:64)
at
parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282)
at
parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252)
at
org.apache.spark.sql.parquet.ParquetOutputWriter.(newParquet.scala:111)
at
org.apache.spark.sql.parquet.ParquetRelation2$$anon$4.newInstance(newParquet.scala:244)
at
org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$outputWriterForRow$1.apply(commands.scala:441)
at
org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$outputWriterForRow$1.apply(commands.scala:436)
at scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:189)
at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:91)
at
org.apache.spark.sql.sources.DynamicPartitionWriterContainer.outputWriterForRow(commands.scala:436)
at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org
$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$2(commands.scala:227)
at
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:196)
at
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:196)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
2015-07-15 13:57:21,051 ERROR Logging$class Uncaught exception in thread
Thread[Executor task launch worker-2,5,main]
java.lang.OutOfMemoryError: Java heap space
at
parquet.bytes.CapacityByteArrayOutputStream.initSlabs(CapacityByteArrayOutputStream.java:65)
at
parquet.bytes.CapacityByteArrayOutputStream.(CapacityByteArrayOutputStream.java:57)
at
parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.(ColumnChunkPageWriteStore.java:68)
at
parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.(ColumnChunkPageWriteStore.java:48)
at
parquet.hadoop.ColumnChunkPageWriteStore.getPageWriter(ColumnChunkPageWriteStore.java:215)
at
parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:67)
at
parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56)
at
parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.(MessageColumnIO.java:178)
at parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:369)
at
parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:108)
at
parquet.hadoop.InternalParquetRecordWriter.(InternalParquetRecordWriter.java:94)
at parquet.hadoop.ParquetRecordWriter.(ParquetRecordWriter.java:64)
at
parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282)
at
parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252)
at
org.apache.spark.sql.parquet.ParquetOutputWriter.(newParquet.scala:111)
at
org.apache.spark.sql.parquet.ParquetRelation2$$anon$4.newInstance(newParquet.scala:244)
at
org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$outputWriterForRow$1.apply(commands.sca

Any beginner samples for using ML / MLIB to produce a moving average of a (K, iterable[V])

2015-07-15 Thread Nkechi Achara
Hi all,

I am trying to get some summary statistics to retrieve the moving average
for several devices that have an array or latency in seconds in this kind
of format:

deviceLatencyMap = [K:String, Iterable[V: Double]]

I understand that there is a MultivariateSummary, but as this is a trait,
but I can't understand what I use in it's stead.

If you need any more code, please let me know.

Thanks All.

K


compression behaviour inconsistency between 1.3 and 1.4

2015-07-15 Thread Marcin Cylke
Hi

I've observed an inconsistent behaviour in .saveAsTextFile. 

Up until version 1.3 it was possible to save RDDs as snappy compressed
files with the invocation of

rdd.saveAsTextFile(targetFile)

but after upgrading to 1.4 this no longer works. I need to specify a
codec for that:

rdd.saveAsTextFile(targetFile, classOf[SnappyCodec])

As I understand I should be able to either set the appropriate codec
class or set those options globally on the cluster using properties. I
have the following settings in /etc/hadoop/conf/core-site.xml


mapred.map.output.compression.codec
org.apache.hadoop.io.compress.SnappyCodec



mapred.compress.map.output
false



mapred.output.compression.codec
org.apache.hadoop.io.compress.SnappyCodec


The config hasn't changed between upgrading from 1.3 to 1.4.

What is the proper behaviour? Am I doing something strange here or has
this recently changed?

Regards
Marcin

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Java 8 vs Scala

2015-07-15 Thread Alan Burlison

On 15/07/2015 08:31, Ignacio Blasco wrote:


The main advantage of using scala vs java 8 is being able to use a console


https://bugs.openjdk.java.net/browse/JDK-8043364

--
Alan Burlison
--

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark and HDFS

2015-07-15 Thread Jeskanen, Elina
I have Spark 1.4 on my local machine and I would like to connect to our local 4 
nodes Cloudera cluster. But how?

In the example it says text_file = spark.textFile("hdfs://..."), but can you 
advise me in where to get this "hdfs://..." -address?

Thanks!

Elina




updateStateByKey schedule time

2015-07-15 Thread Michel Hubert
Hi,


I want to implement a time-out mechanism in de updateStateByKey(...) routine.

But is there a way the retrieve the time of the start of the batch 
corresponding to the call to my updateStateByKey routines?

Suppose the streaming has build up some delay then a System.currentTimeMillis() 
will not be the time of the time the batch was scheduled.

I want to retrieve the job/task schedule time of the batch for which my 
updateStateByKey(..) routine is called.

Is this possible?

With kind regards,
Michel Hubert




RE: [SparkR] creating dataframe from json file

2015-07-15 Thread Sun, Rui
suppose df <- jsonFile(sqlContext, "")

You can extract hashtags.text as a Column object using the following command:
t <- getField(df$hashtags, "text")
and then you can perform operations on the column.

You can extract hashtags.text as a DataFrame using the following command:
   t <- select(df, getField(df$hashtags, "text"))
   showDF(t)

Or you can use SQL query to extract the field:
  hiveContext <- sparkRHive.init()
  df <-jsonFile(hiveContext,"")
  registerTempTable(df, "table")
  t <- sql(hiveContext, "select hashtags.text from table")
  showDF(t)

From: jianshu [jian...@gmail.com]
Sent: Wednesday, July 15, 2015 4:42 PM
To: user@spark.apache.org
Subject: [SparkR] creating dataframe from json file

hi all,

Not sure whether this the right venue to ask. If not, please point me to the
right group, if there is any.

I'm trying to create a Spark DataFrame from JSON file using jsonFile(). The
call was successful, and I can see the DataFrame created. The JSON file I
have contains a number of tweets obtained from Twitter API. Am particularly
interested in pulling the hashtags contains in the tweets. If I use
printSchema(), the schema is something like:

root
 |-- id_str: string (nullable = true)
 |-- hashtags: array (nullable = true)
 ||-- element: struct (containsNull = true)
 |||-- indices: array (nullable = true)
 ||||-- element: long (containsNull = true)
 |||-- text: string (nullable = true)

showDF() would show something like this :

++
|hashtags|
++
|  List()|
|List([List(125, 1...|
|  List()|
|List([List(0, 3),...|
|List([List(76, 86...|
|  List()|
|List([List(74, 84...|
|  List()|
|  List()|
|  List()|
|List([List(85, 96...|
|List([List(125, 1...|
|  List()|
|  List()|
|  List()|
|  List()|
|List([List(14, 17...|
|  List()|
|  List()|
|List([List(14, 17...|
++

The question is now how to extract the text of the hashtags for each tweet?
Still new to SparkR. Am thinking maybe I need to loop through the dataframe
to extract for each tweet. But it seems that lapply does not really apply on
Spark DataFrame as more. Any though on how to extract the text, as it will
be inside a JSON array.


Thanks,


-JS




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-creating-dataframe-from-json-file-tp23849.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Running mllib from R in Spark 1.4

2015-07-15 Thread madhu phatak
Hi,
I have been playing with Spark R API that is introduced in Spark 1.4
version. Can we use any mllib functionality from the R as of now?. From the
documentation it looks like we can only use SQL/Dataframe functionality as
of now. I know there is separate project SparkR project but it doesnot
seems to be maintained in future.

So if I want to run machine learning on SparkR, what are the options as of
now?

-- 
Regards,
Madhukara Phatak
http://datamantra.io/


Re: Sorted Multiple Outputs

2015-07-15 Thread Eugene Morozov
Yiannis ,

It looks like you might explore other approach.

sc.textFile("input/path")
.map() // your own implementation
.partitionBy(new HashPartitioner(num))
.groupBy() //your own implementation, as a result - PairRDD of key vs Iterable 
of values
.foreachPartition()

On the last step you could sort all values for the key and store them into 
separate file even into the same directory of all other files for other keys. 
HashParititoner must guarantee that all values for specific key will reside in 
just one partition, but it might happen that one partition might contain more, 
than one key (with values). This I’m not sure, but that shouldn’t be a big deal 
as you would iterate over tuple> and store one key to a 
specific file.

On 15 Jul 2015, at 03:23, Yiannis Gkoufas  wrote:

> Hi there,
> 
> I have been using the approach described here:
> 
> http://stackoverflow.com/questions/23995040/write-to-multiple-outputs-by-key-spark-one-spark-job
> 
> In addition to that, I was wondering if there is a way to set the customize 
> the order of those values contained in each file.
> 
> Thanks a lot!

Eugene Morozov
fathers...@list.ru






Re: Sessionization using updateStateByKey

2015-07-15 Thread Cody Koeninger
I personally would try to avoid updateStateByKey for sessionization when
you have long sessions / a lot of keys, because it's linear on the number
of keys.

On Tue, Jul 14, 2015 at 6:25 PM, Tathagata Das  wrote:

> [Apologies for repost, for those who have seen this response already in
> the dev mailing list]
>
> 1. When you set ssc.checkpoint(checkpointDir), the spark streaming
> periodically saves the state RDD (which is a snapshot of all the state
> data) to HDFS using RDD checkpointing. In fact, a streaming app with
> updateStateByKey will not start until you set checkpoint directory.
>
> 2. The updateStateByKey performance is sort of independent of the what is
> the source that is being use - receiver based or direct Kafka. The
> absolutely performance obvious depends on a LOT of variables, size of the
> cluster, parallelization, etc. The key things is that you must ensure
> sufficient parallelization at every stage - receiving, shuffles
> (updateStateByKey included), and output.
>
> Some more discussion in my talk -
> https://www.youtube.com/watch?v=d5UJonrruHk
>
>
>
> On Tue, Jul 14, 2015 at 4:13 PM, swetha  wrote:
>
>>
>> Hi,
>>
>> I have a question regarding sessionization using updateStateByKey. If near
>> real time state needs to be maintained in a Streaming application, what
>> happens when the number of RDDs to maintain the state becomes very large?
>> Does it automatically get saved to HDFS and reload when needed or do I
>> have
>> to use any code like ssc.checkpoint(checkpointDir)?  Also, how is the
>> performance if I use both DStream Checkpointing for maintaining the state
>> and use Kafka Direct approach for exactly once semantics?
>>
>>
>> Thanks,
>> Swetha
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Sessionization-using-updateStateByKey-tp23838.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: [SparkR] creating dataframe from json file

2015-07-15 Thread jianshu Weng
Thanks.

t <- getField(df$hashtags, "text") does return a Column. But when I tried
to call t <- getField(df$hashtags, "text"), it would give an error:

Error: All select() inputs must resolve to integer column positions.
The following do not:
*  getField(df$hashtags, "text")

In fact, the "text" field in df is now return as something like
List(, ). Want to flat the list out and make the field
a string like ", ".

You mentioned in the email that "then you can perform operations on the
column.". Bear with me if you feel the question is too naive, am still new
to SparkR. But what operations are allowed on the column, in the SparkR
documentation, I didnt find any specific function for column operation (
https://spark.apache.org/docs/latest/api/R/index.html). I didnt even fine
"getField" function in the documentation as well.

Thanks,

-JS

On Wed, Jul 15, 2015 at 8:42 PM, Sun, Rui  wrote:

> suppose df <- jsonFile(sqlContext, "")
>
> You can extract hashtags.text as a Column object using the following
> command:
> t <- getField(df$hashtags, "text")
> and then you can perform operations on the column.
>
> You can extract hashtags.text as a DataFrame using the following command:
>t <- select(df, getField(df$hashtags, "text"))
>showDF(t)
>
> Or you can use SQL query to extract the field:
>   hiveContext <- sparkRHive.init()
>   df <-jsonFile(hiveContext,"")
>   registerTempTable(df, "table")
>   t <- sql(hiveContext, "select hashtags.text from table")
>   showDF(t)
> 
> From: jianshu [jian...@gmail.com]
> Sent: Wednesday, July 15, 2015 4:42 PM
> To: user@spark.apache.org
> Subject: [SparkR] creating dataframe from json file
>
> hi all,
>
> Not sure whether this the right venue to ask. If not, please point me to
> the
> right group, if there is any.
>
> I'm trying to create a Spark DataFrame from JSON file using jsonFile(). The
> call was successful, and I can see the DataFrame created. The JSON file I
> have contains a number of tweets obtained from Twitter API. Am particularly
> interested in pulling the hashtags contains in the tweets. If I use
> printSchema(), the schema is something like:
>
> root
>  |-- id_str: string (nullable = true)
>  |-- hashtags: array (nullable = true)
>  ||-- element: struct (containsNull = true)
>  |||-- indices: array (nullable = true)
>  ||||-- element: long (containsNull = true)
>  |||-- text: string (nullable = true)
>
> showDF() would show something like this :
>
> ++
> |hashtags|
> ++
> |  List()|
> |List([List(125, 1...|
> |  List()|
> |List([List(0, 3),...|
> |List([List(76, 86...|
> |  List()|
> |List([List(74, 84...|
> |  List()|
> |  List()|
> |  List()|
> |List([List(85, 96...|
> |List([List(125, 1...|
> |  List()|
> |  List()|
> |  List()|
> |  List()|
> |List([List(14, 17...|
> |  List()|
> |  List()|
> |List([List(14, 17...|
> ++
>
> The question is now how to extract the text of the hashtags for each tweet?
> Still new to SparkR. Am thinking maybe I need to loop through the dataframe
> to extract for each tweet. But it seems that lapply does not really apply
> on
> Spark DataFrame as more. Any though on how to extract the text, as it will
> be inside a JSON array.
>
>
> Thanks,
>
>
> -JS
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-creating-dataframe-from-json-file-tp23849.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Research ideas using spark

2015-07-15 Thread William Temperley
There seems to be a bit of confusion here - the OP (doing the PhD) had the
thread hijacked by someone with a similar name asking a mundane question.

It would be a shame to send someone away so rudely, who may do valuable
work on Spark.

Sashidar (not Sashid!) I'm personally interested in running graph
algorithms for image segmentation using MLib and Spark.  I've got many
questions though - like is it even going to give me a speed-up?  (
http://www.frankmcsherry.org/graph/scalability/cost/2015/01/15/COST.html)

It's not obvious to me which classes of graph algorithms can be implemented
correctly and efficiently in a highly parallel manner.  There's tons of
work to be done here, I'm sure. Also, look at parallel geospatial
algorithms - there's a lot of work being done on this.

Best, Will



On 15 July 2015 at 09:01, Vineel Yalamarthy 
wrote:

> Hi Daniel
>
> Well said
>
> Regards
> Vineel
>
> On Tue, Jul 14, 2015, 6:11 AM Daniel Darabos <
> daniel.dara...@lynxanalytics.com> wrote:
>
>> Hi Shahid,
>> To be honest I think this question is better suited for Stack Overflow
>> than for a PhD thesis.
>>
>> On Tue, Jul 14, 2015 at 7:42 AM, shahid ashraf  wrote:
>>
>>> hi
>>>
>>> I have a 10 node cluster  i loaded the data onto hdfs, so the no. of
>>> partitions i get is 9. I am running a spark application , it gets stuck on
>>> one of tasks, looking at the UI it seems application is not using all nodes
>>> to do calculations. attached is the screen shot of tasks, it seems tasks
>>> are put on each node more then once. looking at tasks 8 tasks get completed
>>> under 7-8 minutes and one task takes around 30 minutes so causing the delay
>>> in results.
>>>
>>>
>>> On Tue, Jul 14, 2015 at 10:48 AM, Shashidhar Rao <
>>> raoshashidhar...@gmail.com> wrote:
>>>
 Hi,

 I am doing my PHD thesis on large scale machine learning e.g  Online
 learning, batch and mini batch learning.

 Could somebody help me with ideas especially in the context of Spark
 and to the above learning methods.

 Some ideas like improvement to existing algorithms, implementing new
 features especially the above learning methods and algorithms that have not
 been implemented etc.

 If somebody could help me with some ideas it would really accelerate my
 work.

 Plus few ideas on research papers regarding Spark or Mahout.

 Thanks in advance.

 Regards

>>>
>>>
>>>
>>> --
>>> with Regards
>>> Shahid Ashraf
>>>
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>
>>


Re: Spark and HDFS

2015-07-15 Thread ayan guha
Assuming you run spark locally (ie either local mode or standalone cluster
on your localm/c)
1. You need to have hadoop binaries locally
2. You need to have hdfs-site on Spark Classpath of your local m/c

I would suggest you to start off with local files to play around.

If you need to run spark on CDH cluster using Yarn, then you need to use
spark-submit to yarn cluster. You can see a very good example here:
https://spark.apache.org/docs/latest/running-on-yarn.html



On Wed, Jul 15, 2015 at 10:36 PM, Jeskanen, Elina 
wrote:

>  I have Spark 1.4 on my local machine and I would like to connect to our
> local 4 nodes Cloudera cluster. But how?
>
>
>
> In the example it says text_file = spark.textFile("hdfs://..."), but can
> you advise me in where to get this "hdfs://..." -address?
>
>
>
> Thanks!
>
>
>
> Elina
>
>
>
>
>



-- 
Best Regards,
Ayan Guha


Re: Sessionization using updateStateByKey

2015-07-15 Thread algermissen1971
Hi Cody,

oh ... I though that was one of *the* use cases for it. Do you have a 
suggestion / best practice how to achieve the same thing with better scaling 
characteristics?

Jan

On 15 Jul 2015, at 15:33, Cody Koeninger  wrote:

> I personally would try to avoid updateStateByKey for sessionization when you 
> have long sessions / a lot of keys, because it's linear on the number of keys.
> 
> On Tue, Jul 14, 2015 at 6:25 PM, Tathagata Das  wrote:
> [Apologies for repost, for those who have seen this response already in the 
> dev mailing list]
> 
> 1. When you set ssc.checkpoint(checkpointDir), the spark streaming 
> periodically saves the state RDD (which is a snapshot of all the state data) 
> to HDFS using RDD checkpointing. In fact, a streaming app with 
> updateStateByKey will not start until you set checkpoint directory. 
> 
> 2. The updateStateByKey performance is sort of independent of the what is the 
> source that is being use - receiver based or direct Kafka. The absolutely 
> performance obvious depends on a LOT of variables, size of the cluster, 
> parallelization, etc. The key things is that you must ensure sufficient 
> parallelization at every stage - receiving, shuffles (updateStateByKey 
> included), and output. 
> 
> Some more discussion in my talk - https://www.youtube.com/watch?v=d5UJonrruHk
> 
> 
> 
> On Tue, Jul 14, 2015 at 4:13 PM, swetha  wrote:
> 
> Hi,
> 
> I have a question regarding sessionization using updateStateByKey. If near
> real time state needs to be maintained in a Streaming application, what
> happens when the number of RDDs to maintain the state becomes very large?
> Does it automatically get saved to HDFS and reload when needed or do I have
> to use any code like ssc.checkpoint(checkpointDir)?  Also, how is the
> performance if I use both DStream Checkpointing for maintaining the state
> and use Kafka Direct approach for exactly once semantics?
> 
> 
> Thanks,
> Swetha
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Sessionization-using-updateStateByKey-tp23838.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 
> 
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



java heap error

2015-07-15 Thread AlexG
I'm trying to compute the Frobenius norm error in approximating an
IndexedRowMatrix A with the product L*R where L and R are Breeze
DenseMatrices. 

I've written the following function that computes the squared error over
each partition of rows then sums up to get the total squared error (ignore
the mean argument, it is not used). It works on a smaller dataset that I've
been using to test my code, but fails on the full-sized A with an error
about the java heap being out of size.

A here is an 8mil-by-100K matrix partitioned into 5015 parts, L is
8mil-by-16, and R is 16-by-100K. Together L and R take up way less memory
than I have on each executor (64 Gb), so I don't understand the cause of the
error.

def calcCenteredFrobNormErr(mat: IndexedRowMatrix, lhsTall: BDM[Double],
rhsFat: BDM[Double], mean: BDV[Double] ) : Double = {
val lhsFactor = mat.rows.context.broadcast(lhsTall)
val rhsFactor = mat.rows.context.broadcast(rhsFat)

def partitionDiffFrobNorm2(rowiter : Iterator[IndexedRow], lhsFactor:
Broadcast[BDM[Double]], rhsFactor: Broadcast[BDM[Double]]) :
Iterator[Double] = {

  val lhsTall = lhsFactor.value
  val rhsFat = rhsFactor.value

  val rowlist = rowiter.toList
  val numrows = rowlist.length
  val matSubMat = BDM.zeros[Double](numrows, mat.numCols.toInt)
  val lhsSubMat = BDM.zeros[Double](numrows, lhsTall.cols)

  var currowindex = 0
  rowlist.foreach(
(currow: IndexedRow) => {
  currow.vector.foreachActive { case (j, v) =>
matSubMat(currowindex, j) = v }
  lhsSubMat(currowindex, ::) := lhsTall(currow.index.toInt, ::)
  currowindex += 1
}
  )

  val diffmat = matSubMat - lhsSubMat * rhsFat
  List(sum(diffmat :* diffmat)).iterator
}

report("Beginning to compute Frobenius norm", true)
val res = mat.rows.mapPartitions(rowiter =>
partitionDiffFrobNorm2(rowiter, lhsFactor, rhsFactor)).reduce(_ + _)
report("Finished computing Frobenius norm", true)
math.sqrt(res)
  }



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/java-heap-error-tp23856.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Job aborted due to stage failure: Task not serializable:

2015-07-15 Thread Naveen Dabas



  

 I am using the below code and using kryo serializer .when i run this code i 
got this error : Task not serializable in commented line2) how broadcast 
variables are treated in exceotu.are they local variables or can be used in any 
function defined as global variables.
object StreamingLogInput {  def main(args: Array[String]) {    val master = 
args(0)    val conf = new SparkConf().setAppName("StreamingLogInput")    // 
Create a StreamingContext with a 1 second batch size        val sc = new 
SparkContext(conf)    val lines=sc.parallelize(List("eoore is test","testing is 
error report"))    //val ssc = new StreamingContext(sc, Seconds(30))    //val 
lines = ssc.socketTextStream("localhost", )    val 
filter=sc.textFile("/user/nadabas/filters/fltr")    val 
filarr=filter.collect().toArray    val broadcastVar = sc.broadcast(filarr)      
  // val out=lines.transform{rdd=>rdd.filter(x=>fil(broadcastVar.value,x))}    
val out=lines.filter(x=>fil(broadcastVar.value,x))  //error is coming        
out.collect()      }  def fil(x1:Array[String],y1:String)={    val y=y1 // val 
x=broadcastVar.value    val x=x1  var flag:Boolean=false     for(a<-x)  {    
if(y.contains(a))    flag=true    }    flag    }   }

   

DataFrame more efficient than RDD?

2015-07-15 Thread k0ala
Hi,

I have been working a bit with RDD, and am now taking a look at DataFrames.
The schema definition using case classes looks very attractive;

https://spark.apache.org/docs/1.4.0/sql-programming-guide.html#inferring-the-schema-using-reflection

  

Is a DataFrame more efficient (space-wise) than an RDD for the same case
class?

And in general, when should DataFrames be preferred over RDDs, and vice
versa?





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/DataFrame-more-efficient-than-RDD-tp23857.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Strange Error: "java.lang.OutOfMemoryError: GC overhead limit exceeded"

2015-07-15 Thread Saeed Shahrivari
I use a simple map/reduce step in a Java/Spark program to remove duplicated
documents from a large (10 TB compressed) sequence file containing some
html pages. Here is the partial code:

JavaPairRDD inputRecords =
sc.sequenceFile(args[0], BytesWritable.class,
NullWritable.class).coalesce(numMaps);


JavaPairRDD hashDocs =
inputRecords.mapToPair(t ->
cacheDocs.add(new Tuple2<>(BaseEncoding.base64()
.encode(Hashing.sha1().hashString(doc.getUrl(),
Charset.defaultCharset()).asBytes()), doc));
});


JavaPairRDD byteArrays =
hashDocs.reduceByKey((a, b) -> a.getUrl() < b.getUrl() ? a : b, numReds).
mapToPair(t -> new Tuple2<>(new
BytesWritable(PentV3.buildFromMessage(t._2).serializeUncompressed()),
NullWritable.get()));


The logic is simple. The map generates a sha-1 signature from the html
and in the reduce phase we keep the html that has the shortest URL.
However, after running for 2-3 hours the application crashes due to
memory issue. Here is the exception:

15/07/15 18:24:05 WARN scheduler.TaskSetManager: Lost task 267.0 in
stage 0.0 (TID 267, psh-11.nse.ir): java.lang.OutOfMemoryError: GC
overhead limit exceeded


It seems that the map function keeps the hashDocs RDD in the memory
and when the memory is filled in an executor, the application crashes.
Persisting the map output to disk solves the problem. Adding the
following line between map and reduce solve the issue:

hashDocs.persist(StorageLevel.DISK_ONLY());


Is this a bug of Spark?

How can I tell Spark not to keep even a bit of RDD in the memory?


Thanks


java.lang.NoClassDefFoundError: Could not initialize class org.fusesource.jansi.internal.Kernel32

2015-07-15 Thread Wang, Ningjun (LNG-NPV)
I just installed spark 1.3.1 on windows 2008 server. When I start spark-shell, 
I got the following error

Failed to created SparkJLineReader: java.lang.NoClassDefFoundError: Could not 
initialize class org.fusesource.jansi.internal.Kernel32

Please advise. Thanks.

Ningjun


Re: Running mllib from R in Spark 1.4

2015-07-15 Thread Burak Yavuz
Hi,
There is no MLlib support in SparkR in 1.4. There will be some support in
1.5. You can check these JIRAs for progress:
https://issues.apache.org/jira/browse/SPARK-6805
https://issues.apache.org/jira/browse/SPARK-6823

Best,
Burak

On Wed, Jul 15, 2015 at 6:00 AM, madhu phatak  wrote:

> Hi,
> I have been playing with Spark R API that is introduced in Spark 1.4
> version. Can we use any mllib functionality from the R as of now?. From the
> documentation it looks like we can only use SQL/Dataframe functionality as
> of now. I know there is separate project SparkR project but it doesnot
> seems to be maintained in future.
>
> So if I want to run machine learning on SparkR, what are the options as of
> now?
>
> --
> Regards,
> Madhukara Phatak
> http://datamantra.io/
>


Re: Sessionization using updateStateByKey

2015-07-15 Thread Cody Koeninger
An in-memory hash key data structure of some kind so that you're close to
linear on the number of items in a batch, not the number of outstanding
keys.  That's more complex, because you have to deal with expiration for
keys that never get hit, and for unusually long sessions you have to either
drop them or hit durable storage.

Maybe someone has a better idea, I'd like to hear it.

On Wed, Jul 15, 2015 at 8:54 AM, algermissen1971  wrote:

> Hi Cody,
>
> oh ... I though that was one of *the* use cases for it. Do you have a
> suggestion / best practice how to achieve the same thing with better
> scaling characteristics?
>
> Jan
>
> On 15 Jul 2015, at 15:33, Cody Koeninger  wrote:
>
> > I personally would try to avoid updateStateByKey for sessionization when
> you have long sessions / a lot of keys, because it's linear on the number
> of keys.
> >
> > On Tue, Jul 14, 2015 at 6:25 PM, Tathagata Das 
> wrote:
> > [Apologies for repost, for those who have seen this response already in
> the dev mailing list]
> >
> > 1. When you set ssc.checkpoint(checkpointDir), the spark streaming
> periodically saves the state RDD (which is a snapshot of all the state
> data) to HDFS using RDD checkpointing. In fact, a streaming app with
> updateStateByKey will not start until you set checkpoint directory.
> >
> > 2. The updateStateByKey performance is sort of independent of the what
> is the source that is being use - receiver based or direct Kafka. The
> absolutely performance obvious depends on a LOT of variables, size of the
> cluster, parallelization, etc. The key things is that you must ensure
> sufficient parallelization at every stage - receiving, shuffles
> (updateStateByKey included), and output.
> >
> > Some more discussion in my talk -
> https://www.youtube.com/watch?v=d5UJonrruHk
> >
> >
> >
> > On Tue, Jul 14, 2015 at 4:13 PM, swetha 
> wrote:
> >
> > Hi,
> >
> > I have a question regarding sessionization using updateStateByKey. If
> near
> > real time state needs to be maintained in a Streaming application, what
> > happens when the number of RDDs to maintain the state becomes very large?
> > Does it automatically get saved to HDFS and reload when needed or do I
> have
> > to use any code like ssc.checkpoint(checkpointDir)?  Also, how is the
> > performance if I use both DStream Checkpointing for maintaining the state
> > and use Kafka Direct approach for exactly once semantics?
> >
> >
> > Thanks,
> > Swetha
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Sessionization-using-updateStateByKey-tp23838.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
> >
> >
>
>


Tasks unevenly distributed in Spark 1.4.0

2015-07-15 Thread gisleyt
Hello all,

I upgraded from spark 1.3.1 to 1.4.0, but I'm experiencing a massive drop in
performance for the application I'm running. I've (somewhat) reproduced this
behaviour in the attached file. 

My current spark setup may not be optimal exactly for this reproduction, but
I see that Spark 1.4.0 takes 12 minute to complete, while 1.3.1 finishes in
8 minutes in this test. I've found that when you play about with subtraction
and sampling of JavaRDDs (see attached reproduction test), tasks do not seem
to be properly distributed among the workers when you're doing additional
operations on the data. I derive this from the admin view, where I clearly
see that in 1.4.0, tasks are distributed differently, and specifically, one
task consists of almost all the data, while the other tasks are tiny. 

 

Do any of you know of any changes to 1.4.0 that could explain this
behaviour? When submitting the same application to Spark 1.3.1, the tasks
are distributed uniformly, and the application is therefore much quicker.

Thanks,
Gisle

ReproduceHang.java

  



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Tasks-unevenly-distributed-in-Spark-1-4-0-tp23858.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark on EMR with S3 example (Python)

2015-07-15 Thread Sujit Pal
Hi Roberto,

I think you would need to as Akhil said. Just checked from this page:

http://aws.amazon.com/public-data-sets/

and clicking through to a few dataset links, all of them are available on
s3 (some are available via http and ftp, but I think the point of these
datasets are that they are usually very large so having it on s3 ensures
that its easier to take your code to it than bring the datasets to your
code.

-sujit


On Tue, Jul 14, 2015 at 1:56 PM, Pagliari, Roberto 
wrote:

> Hi Sujit,
>
> I just wanted to access public datasets on Amazon. Do I still need to
> provide the keys?
>
>
>
> Thank you,
>
>
>
>
>
> *From:* Sujit Pal [mailto:sujitatgt...@gmail.com]
> *Sent:* Tuesday, July 14, 2015 3:14 PM
> *To:* Pagliari, Roberto
> *Cc:* user@spark.apache.org
> *Subject:* Re: Spark on EMR with S3 example (Python)
>
>
>
> Hi Roberto,
>
>
>
> I have written PySpark code that reads from private S3 buckets, it should
> be similar for public S3 buckets as well. You need to set the AWS access
> and secret keys into the SparkContext, then you can access the S3 folders
> and files with their s3n:// paths. Something like this:
>
>
>
> sc = SparkContext()
>
> sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", aws_access_key)
>
> sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey",
> aws_secret_key)
>
>
>
> mydata = sc.textFile("s3n://mybucket/my_input_folder") \
>
> .map(lambda x: do_something(x)) \
>
> .saveAsTextFile("s3://mybucket/my_output_folder")
>
> ...
>
>
>
> You can read and write sequence files as well - these are the only 2
> formats I have tried, but I'm sure the other ones like JSON would work
> also. Another approach is to embed the AWS access key and secret key into
> the s3n:// path.
>
>
>
> I wasn't able to use the s3 protocol, but s3n is equivalent (I believe its
> an older version but not sure) but it works for access.
>
>
>
> Hope this helps,
>
> Sujit
>
>
>
>
>
> On Tue, Jul 14, 2015 at 10:50 AM, Pagliari, Roberto <
> rpagli...@appcomsci.com> wrote:
>
> Is there an example about how to load data from a public S3 bucket in
> Python? I haven’t found any.
>
>
>
> Thank you,
>
>
>
>
>


Re: Efficiency of leftOuterJoin a cassandra rdd

2015-07-15 Thread Sujit Pal
Hi Wush,

One option may be to try a replicated join. Since your rdd1 is small, read
it into a collection and broadcast it to the workers, then filter your
larger rdd2 against the collection on the workers.

-sujit


On Tue, Jul 14, 2015 at 11:33 PM, Deepak Jain  wrote:

> Leftouterjoin and join apis are super slow in spark. 100x slower than
> hadoop
>
> Sent from my iPhone
>
> > On 14-Jul-2015, at 10:59 PM, Wush Wu  wrote:
> >
> > I don't understand.
> >
> > By the way, the `joinWithCassandraTable` does improve my query time
> > from 40 mins to 3 mins.
> >
> >
> > 2015-07-15 13:19 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) :
> >> I have explored spark joins for last few months (you can search my
> posts)
> >> and its frustrating useless.
> >>
> >>> On Tue, Jul 14, 2015 at 9:35 PM, Wush Wu  wrote:
> >>>
> >>> Dear all,
> >>>
> >>> I have found a post discussing the same thing:
> >>>
> >>>
> https://groups.google.com/a/lists.datastax.com/forum/#!searchin/spark-connector-user/join/spark-connector-user/q3GotS-n0Wk/g-LPTteCEg0J
> >>>
> >>> The solution is using "joinWithCassandraTable" and the documentation
> >>> is here:
> >>>
> https://github.com/datastax/spark-cassandra-connector/blob/v1.3.0-M2/doc/2_loading.md
> >>>
> >>> Wush
> >>>
> >>> 2015-07-15 12:15 GMT+08:00 Wush Wu :
>  Dear all,
> 
>  I am trying to join two RDDs, named rdd1 and rdd2.
> 
>  rdd1 is loaded from a textfile with about 33000 records.
> 
>  rdd2 is loaded from a table in cassandra which has about 3 billions
>  records.
> 
>  I tried the following code:
> 
>  ```scala
> 
>  val rdd1 : (String, XXX) = sc.textFile(...).map(...)
>  import org.apache.spark.sql.cassandra.CassandraSQLContext
>  cc.setKeyspace("xxx")
>  val rdd2 : (String, String) = cc.sql("SELECT x, y FROM xxx").map(r =>
>  ...)
> 
>  val result = rdd1.leftOuterJoin(rdd2)
>  result.take(20)
> 
>  ```
> 
>  However, the log shows that the spark loaded 3 billions records from
>  cassandra and only 33000 records left at the end.
> 
>  Is there a way to query the cassandra based on the key in rdd1?
> 
>  Here is some information of our system:
> 
>  - The spark version is 1.3.1
>  - The cassandra version is 2.0.14
>  - The key of joining is the primary key of the cassandra table.
> 
>  Best,
>  Wush
> >>>
> >>> -
> >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >>> For additional commands, e-mail: user-h...@spark.apache.org
> >>
> >>
> >>
> >> --
> >> Deepak
> >>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


out of memory error in treeAggregate

2015-07-15 Thread AlexG
I'm using the following function to compute B*A where B is a 32-by-8mil
Breeze DenseMatrix and A is a 8mil-by-100K IndexedRowMatrix. 

// computes BA where B is a local matrix and A is distributed: let b_i
denote the
// ith col of B and a_i denote the ith row of A, then BA = sum(b_i a_i)

def leftMultiplyCenteredMatrixBy(mat: IndexedRowMatrix, lhs: DenseMatrix,
avg: BDV[Double]) : DenseMatrix = {
   val lhsBrz = lhs.toBreeze.asInstanceOf[BDM[Double]]
   val result =
 mat.rows.treeAggregate(BDM.zeros[Double](lhs.numRows.toInt,
mat.numCols.toInt))(
   seqOp = (U: BDM[Double], row: IndexedRow) => {
 val rowBrz = row.vector.toBreeze.asInstanceOf[BSV[Double]] - avg
 U += lhsBrz(::, row.index.toInt) * rowBrz.t
   },
   combOp = (U1, U2) => U1 += U2
 )
   fromBreeze(result)
  }

The accumulator used by the treeAggregate call is only 32-by-100K, and B is
less than a Gb. The executors have 64Gb RAM, yet the call fails with the
error

Exception in thread "main" java.lang.OutOfMemoryError: Requested array size
exceeds VM limit
  at java.util.Arrays.copyOf(Arrays.java:2271)
  at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
  at
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
  at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
  at
java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
  at
java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
  at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
  at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
  at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
  at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)
  at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
  at org.apache.spark.SparkContext.clean(SparkContext.scala:1891)
  at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1072)
  at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148)
  at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
  at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1067)
  at
org.apache.spark.mllib.linalg.distributed.SVDVariants$.leftMultiplyCenteredMatrixBy(SVDVariants.scala:120)

Any idea what's going on/how to fix it?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/out-of-memory-error-in-treeAggregate-tp23859.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Strange Error: "java.lang.OutOfMemoryError: GC overhead limit exceeded"

2015-07-15 Thread Ted Yu
bq. serializeUncompressed()

Is there a method which enables compression ?

Just wondering if that would reduce the memory footprint.

Cheers

On Wed, Jul 15, 2015 at 8:06 AM, Saeed Shahrivari <
saeed.shahriv...@gmail.com> wrote:

> I use a simple map/reduce step in a Java/Spark program to remove
> duplicated documents from a large (10 TB compressed) sequence file
> containing some html pages. Here is the partial code:
>
> JavaPairRDD inputRecords =
> sc.sequenceFile(args[0], BytesWritable.class, 
> NullWritable.class).coalesce(numMaps);
>
>
> JavaPairRDD hashDocs = inputRecords.mapToPair(t 
> ->
> cacheDocs.add(new Tuple2<>(BaseEncoding.base64()
> .encode(Hashing.sha1().hashString(doc.getUrl(), 
> Charset.defaultCharset()).asBytes()), doc));
> });
>
>
> JavaPairRDD byteArrays =
> hashDocs.reduceByKey((a, b) -> a.getUrl() < b.getUrl() ? a : b, numReds).
> mapToPair(t -> new Tuple2<>(new 
> BytesWritable(PentV3.buildFromMessage(t._2).serializeUncompressed()),
> NullWritable.get()));
>
>
> The logic is simple. The map generates a sha-1 signature from the html and in 
> the reduce phase we keep the html that has the shortest URL. However, after 
> running for 2-3 hours the application crashes due to memory issue. Here is 
> the exception:
>
> 15/07/15 18:24:05 WARN scheduler.TaskSetManager: Lost task 267.0 in stage 0.0 
> (TID 267, psh-11.nse.ir): java.lang.OutOfMemoryError: GC overhead limit 
> exceeded
>
>
> It seems that the map function keeps the hashDocs RDD in the memory and when 
> the memory is filled in an executor, the application crashes. Persisting the 
> map output to disk solves the problem. Adding the following line between map 
> and reduce solve the issue:
>
> hashDocs.persist(StorageLevel.DISK_ONLY());
>
>
> Is this a bug of Spark?
>
> How can I tell Spark not to keep even a bit of RDD in the memory?
>
>
> Thanks
>
>
>


Re: Research ideas using spark

2015-07-15 Thread Robin East
Well said Will. I would add that you might want to investigate GraphChi which 
claims to be able to run a number of large-scale graph processing tasks on a 
workstation much quicker than a very large Hadoop cluster. It would be 
interesting to know how widely applicable the approach GraphChi takes and what 
implications it has for parallel/distributed computing approaches. A rich seam 
to mine indeed.

Robin
> On 15 Jul 2015, at 14:48, William Temperley  wrote:
> 
> There seems to be a bit of confusion here - the OP (doing the PhD) had the 
> thread hijacked by someone with a similar name asking a mundane question.
> 
> It would be a shame to send someone away so rudely, who may do valuable work 
> on Spark.
> 
> Sashidar (not Sashid!) I'm personally interested in running graph algorithms 
> for image segmentation using MLib and Spark.  I've got many questions though 
> - like is it even going to give me a speed-up?  
> (http://www.frankmcsherry.org/graph/scalability/cost/2015/01/15/COST.html 
> )
> 
> It's not obvious to me which classes of graph algorithms can be implemented 
> correctly and efficiently in a highly parallel manner.  There's tons of work 
> to be done here, I'm sure. Also, look at parallel geospatial algorithms - 
> there's a lot of work being done on this.
> 
> Best, Will
> 
> 
> 
> On 15 July 2015 at 09:01, Vineel Yalamarthy  > wrote:
> Hi Daniel
> 
> Well said
> 
> Regards 
> Vineel
> 
> 
> On Tue, Jul 14, 2015, 6:11 AM Daniel Darabos 
> mailto:daniel.dara...@lynxanalytics.com>> 
> wrote:
> Hi Shahid,
> To be honest I think this question is better suited for Stack Overflow than 
> for a PhD thesis.
> 
> On Tue, Jul 14, 2015 at 7:42 AM, shahid ashraf  > wrote:
> hi 
> 
> I have a 10 node cluster  i loaded the data onto hdfs, so the no. of 
> partitions i get is 9. I am running a spark application , it gets stuck on 
> one of tasks, looking at the UI it seems application is not using all nodes 
> to do calculations. attached is the screen shot of tasks, it seems tasks are 
> put on each node more then once. looking at tasks 8 tasks get completed under 
> 7-8 minutes and one task takes around 30 minutes so causing the delay in 
> results. 
> 
> 
> On Tue, Jul 14, 2015 at 10:48 AM, Shashidhar Rao  > wrote:
> Hi,
> 
> I am doing my PHD thesis on large scale machine learning e.g  Online 
> learning, batch and mini batch learning.
> 
> Could somebody help me with ideas especially in the context of Spark and to 
> the above learning methods. 
> 
> Some ideas like improvement to existing algorithms, implementing new features 
> especially the above learning methods and algorithms that have not been 
> implemented etc.
> 
> If somebody could help me with some ideas it would really accelerate my work.
> 
> Plus few ideas on research papers regarding Spark or Mahout.
> 
> Thanks in advance.
> 
> Regards 
> 
> 
> 
> -- 
> with Regards
> Shahid Ashraf
> 
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> 
> For additional commands, e-mail: user-h...@spark.apache.org 
> 
> 
> 



Spark Accumulator Issue - java.io.IOException: java.lang.StackOverflowError

2015-07-15 Thread Jadhav Shweta
Hi,

I am trying one transformation by calling scala method
this scala method returns MutableList[AvroObject]

def processRecords(id: String, list1: Iterable[(String, GenericRecord)]): 
scala.collection.mutable.MutableList[AvroObject] 

Hence, the output of transaformation is RDD[MutableList[AvroObject]]

But I want o/p as RDD[AvroObject]

I tried applying foreach on RDD[MutableList[AvroObject]] --> RDD[AvroObject]

var uA = sparkContext.accumulableCollection[MutableList[AvroObject], 
universe](MutableList[AvroObject]())
rdd_list_avroObj.foreach(u => {
uA ++= u
})
var uRDD = sparkContext.parallelize(uA.value)

Its failing on large dataset with following error

java.io.IOException: java.lang.StackOverflowError
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1140)
at 
org.apache.spark.scheduler.DirectTaskResult.writeExternal(TaskResult.scala:45)
at 
java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1458)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
at 
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:226)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.StackOverflowError
at 
java.io.ObjectOutputStream$HandleTable.hash(ObjectOutputStream.java:2359)
at 
java.io.ObjectOutputStream$HandleTable.lookup(ObjectOutputStream.java:2292)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1115)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at java.util.ArrayList.writeObject(ArrayList.java:742)
at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)

I have two queries regarding this issue:
Option 1: REplacement of accumulator
Option 2: In scala method instead of returning List[AvroObject] can I send 
multiple AvroObject. SO that I'll get RDD[AvroObject]

Note:
I am using Saprk 1.3.0
Input DataSize 200GB
Cluster 3 Machines(2 Cores, 8GB)
Spark running in YARN Mode

Thanks & Regards
Shweta Jadhav
Tata Consultancy Services Limited
Cell:- +91-9867515614
Mailto: jadhav.shw...@tcs.com
Website: http://www.tcs.com

Experience certainty.   IT Services
Business Solutions
Consulting

=-=-=
Notice: The information contained in this e-mail
message and/or attachments to it may contain 
confidential or privileged information. If you are 
not the intended recipient, any dissemination, use, 
review, distribution, printing or copying of the 
information contained in this e-mail message 
and/or attachments to it are strictly prohibited. If 
you have received this communication in error, 
please notify us by reply e-mail or telephone and 
immediately and permanently delete the message 
and any attachments. Thank you




Re: Spark and HDFS

2015-07-15 Thread Naveen Madhire
Yes. I did this recently. You need to copy the cloudera cluster related
conf files into the local machine
and set HADOOP_CONF_DIR or YARN_CONF_DIR.

And also local machine should be able to ssh to the cloudera cluster.

On Wed, Jul 15, 2015 at 8:51 AM, ayan guha  wrote:

> Assuming you run spark locally (ie either local mode or standalone cluster
> on your localm/c)
> 1. You need to have hadoop binaries locally
> 2. You need to have hdfs-site on Spark Classpath of your local m/c
>
> I would suggest you to start off with local files to play around.
>
> If you need to run spark on CDH cluster using Yarn, then you need to use
> spark-submit to yarn cluster. You can see a very good example here:
> https://spark.apache.org/docs/latest/running-on-yarn.html
>
>
>
> On Wed, Jul 15, 2015 at 10:36 PM, Jeskanen, Elina 
> wrote:
>
>>  I have Spark 1.4 on my local machine and I would like to connect to our
>> local 4 nodes Cloudera cluster. But how?
>>
>>
>>
>> In the example it says text_file = spark.textFile("hdfs://..."), but can
>> you advise me in where to get this "hdfs://..." -address?
>>
>>
>>
>> Thanks!
>>
>>
>>
>> Elina
>>
>>
>>
>>
>>
>
>
>
> --
> Best Regards,
> Ayan Guha
>


Spark 1.4.0 compute-classpath.sh

2015-07-15 Thread lokeshkumar
Hi forum

I have downloaded the latest spark version 1.4.0 and started using it.
But I couldn't find the compute-classpath.sh file in bin/ which I am using
in previous versions to provide third party libraries to my application. 

Can anyone please let me know where I can provide CLASSPATH with my third
party libs in 1.4.0?

Thanks
Lokesh



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-4-0-compute-classpath-sh-tp23860.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Research ideas using spark

2015-07-15 Thread Jörn Franke
Well one of the strength of spark is standardized general distributed
processing allowing many different types of processing, such as graph
processing, stream processing etc. The limitation is that it is less
performant than one system focusing only on one type of processing (eg
graph processing). I miss - and this may not be spark specific - some
artificial intelligence to manage a cluster, e.g. Predicting workloads, how
long a job may run based on previously executed similar jobs etc.
Furthermore, many optimizations you have do to manually, e.g. Bloom
filters, partitioning etc - if you find here as well some intelligence that
does this automatically based on previously executed jobs taking into
account that optimizations themselves change over time would be great...
You may also explore feature interaction

Le mar. 14 juil. 2015 à 7:19, Shashidhar Rao  a
écrit :

> Hi,
>
> I am doing my PHD thesis on large scale machine learning e.g  Online
> learning, batch and mini batch learning.
>
> Could somebody help me with ideas especially in the context of Spark and
> to the above learning methods.
>
> Some ideas like improvement to existing algorithms, implementing new
> features especially the above learning methods and algorithms that have not
> been implemented etc.
>
> If somebody could help me with some ideas it would really accelerate my
> work.
>
> Plus few ideas on research papers regarding Spark or Mahout.
>
> Thanks in advance.
>
> Regards
>


Re: Research ideas using spark

2015-07-15 Thread shahid ashraf
Sorry Guys!

I mistakenly added my question to this thread( Research ideas using spark).
Moreover people can ask any question , this spark user group is for that.

Cheers!
😊

On Wed, Jul 15, 2015 at 9:43 PM, Robin East  wrote:

> Well said Will. I would add that you might want to investigate GraphChi
> which claims to be able to run a number of large-scale graph processing
> tasks on a workstation much quicker than a very large Hadoop cluster. It
> would be interesting to know how widely applicable the approach GraphChi
> takes and what implications it has for parallel/distributed computing
> approaches. A rich seam to mine indeed.
>
> Robin
>
> On 15 Jul 2015, at 14:48, William Temperley 
> wrote:
>
> There seems to be a bit of confusion here - the OP (doing the PhD) had the
> thread hijacked by someone with a similar name asking a mundane question.
>
> It would be a shame to send someone away so rudely, who may do valuable
> work on Spark.
>
> Sashidar (not Sashid!) I'm personally interested in running graph
> algorithms for image segmentation using MLib and Spark.  I've got many
> questions though - like is it even going to give me a speed-up?  (
> http://www.frankmcsherry.org/graph/scalability/cost/2015/01/15/COST.html)
>
> It's not obvious to me which classes of graph algorithms can be
> implemented correctly and efficiently in a highly parallel manner.  There's
> tons of work to be done here, I'm sure. Also, look at parallel geospatial
> algorithms - there's a lot of work being done on this.
>
> Best, Will
>
>
>
> On 15 July 2015 at 09:01, Vineel Yalamarthy 
> wrote:
>
>> Hi Daniel
>>
>> Well said
>>
>> Regards
>> Vineel
>>
>> On Tue, Jul 14, 2015, 6:11 AM Daniel Darabos <
>> daniel.dara...@lynxanalytics.com> wrote:
>>
>>> Hi Shahid,
>>> To be honest I think this question is better suited for Stack Overflow
>>> than for a PhD thesis.
>>>
>>> On Tue, Jul 14, 2015 at 7:42 AM, shahid ashraf 
>>> wrote:
>>>
 hi

 I have a 10 node cluster  i loaded the data onto hdfs, so the no. of
 partitions i get is 9. I am running a spark application , it gets stuck on
 one of tasks, looking at the UI it seems application is not using all nodes
 to do calculations. attached is the screen shot of tasks, it seems tasks
 are put on each node more then once. looking at tasks 8 tasks get completed
 under 7-8 minutes and one task takes around 30 minutes so causing the delay
 in results.


 On Tue, Jul 14, 2015 at 10:48 AM, Shashidhar Rao <
 raoshashidhar...@gmail.com> wrote:

> Hi,
>
> I am doing my PHD thesis on large scale machine learning e.g  Online
> learning, batch and mini batch learning.
>
> Could somebody help me with ideas especially in the context of Spark
> and to the above learning methods.
>
> Some ideas like improvement to existing algorithms, implementing new
> features especially the above learning methods and algorithms that have 
> not
> been implemented etc.
>
> If somebody could help me with some ideas it would really accelerate
> my work.
>
> Plus few ideas on research papers regarding Spark or Mahout.
>
> Thanks in advance.
>
> Regards
>



 --
 with Regards
 Shahid Ashraf


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org

>>>
>>>
>
>


-- 
with Regards
Shahid Ashraf


Spark job returns a different result on each run

2015-07-15 Thread sbvarre
I am working on a scala code which performs Linear Regression on certain
datasets. Right now I am using 20 cores and 25 executors and everytime I run
a Spark job I get a different result.

The input size of the files are 2GB and 400 MB.However, when I run the job
with 20 cores and 1 executor, I get consistent results.

Has anyone experienced such a thing so far?

Please find the code below:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql.SQLContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SchemaRDD
import org.apache.spark.Partitioner
import org.apache.spark.storage.StorageLevel

object TextProcess{
  def main(args: Array[String]){
val conf = new SparkConf().set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val numExecutors=(conf.get("spark.executor.instances").toInt)
// Read the 2 input files
// First file is either cases / controls
val input1 = sc.textFile(args(0))
// Second file is Gene Expression
val input2 = sc.textFile(args(1))

  //collecting header information
val header1=sc.parallelize(input1.take(1))
val header2=sc.parallelize(input2.take(1))

//mapping data without the header information
val map1 = input1.subtract(header1).map(x => (x.split("
")(0)+x.split(" ")(1), x))
val map2 = input2.subtract(header2).map(x => (x.split("
")(0)+x.split(" ")(1), x))


//joining data. here is where the order was getting affected. 
val joinedMap = map1.join(map2)

//adding the header back to the top of RDD
val x = header1.union(joinedMap.map{case(x,(y,z))=>y})

val y = header2.union(joinedMap.map{case(x,(y,z))=>z})

//removing irrelevant columns
val rddX = x.map(x=>x.split("
").drop(3)).zipWithIndex.map{case(a,b)=> a.map(x=>b.toString+"
"+x.toString)}
val rddY = y.map(x=>x.split("
").drop(2)).zipWithIndex.map{case(a,b)=> a.map(x=>b.toString+"
"+x.toString)}


//transposing and cross joining data. This keeps the identifier
at the start
val transposedX = rddX.flatMap(x =>
x.zipWithIndex.map(x=>x.swap)).reduceByKey((a,b)=>
a+":"+b).map{case(a,b)=>b.split(":").sorted}
val transposedY = rddY.flatMap(x =>
x.zipWithIndex.map(x=>x.swap)).reduceByKey((a,b)=>
a+":"+b).map{case(a,b)=>b.split(":").sorted}.persist(StorageLevel.apply(false,
true, false, false, numExecutors))

val cleanedX =
transposedX.map(x=>x.map(x=>x.slice(x.indexOfSlice(" ")+1,x.length)))
val cleanedY =
transposedY.map(x=>x.map(x=>x.slice(x.indexOfSlice("
")+1,x.length))).persist(StorageLevel.apply(false, true, false, false,
numExecutors))


val cartXY = cleanedX.cartesian(cleanedY)
val finalDataSet= cartXY.map{case(a,b)=>a zip b} 
//convert to key value pair
val regressiondataset =
finalDataSet.map(x=>(x(0),x.drop(1).filter{case(a,b)=> a!="NA" && b!="NA" &&
a!="null" && b!="null"}.map{case(a,b)=> (a.toDouble, b.toDouble)}))


val linearOutput = regressiondataset.map(s => new
LinearRegression(s._1 ,s._2).outputVal)

linearOutput.saveAsTextFile(args(2))
cleanedY.unpersist()
transposedY.unpersist()

  }
}





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-job-returns-a-different-result-on-each-run-tp23861.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Sessionization using updateStateByKey

2015-07-15 Thread Silvio Fiorito
Hi Cody,

I’ve had success using updateStateByKey for real-time sessionization by aging 
off timed-out sessions (returning None in the update function). This was on a 
large commercial website with millions of hits per day. This was over a year 
ago so I don’t have access to the stats any longer for length of sessions 
unfortunately, but I seem to remember they were around 10-30 minutes long. Even 
with peaks in volume, Spark managed to keep up very well.

Thanks,
Silvio

From: Cody Koeninger
Date: Wednesday, July 15, 2015 at 5:38 PM
To: algermissen1971
Cc: Tathagata Das, swetha, user
Subject: Re: Sessionization using updateStateByKey

An in-memory hash key data structure of some kind so that you're close to 
linear on the number of items in a batch, not the number of outstanding keys.  
That's more complex, because you have to deal with expiration for keys that 
never get hit, and for unusually long sessions you have to either drop them or 
hit durable storage.

Maybe someone has a better idea, I'd like to hear it.

On Wed, Jul 15, 2015 at 8:54 AM, algermissen1971 
mailto:algermissen1...@icloud.com>> wrote:
Hi Cody,

oh ... I though that was one of *the* use cases for it. Do you have a 
suggestion / best practice how to achieve the same thing with better scaling 
characteristics?

Jan

On 15 Jul 2015, at 15:33, Cody Koeninger 
mailto:c...@koeninger.org>> wrote:

> I personally would try to avoid updateStateByKey for sessionization when you 
> have long sessions / a lot of keys, because it's linear on the number of keys.
>
> On Tue, Jul 14, 2015 at 6:25 PM, Tathagata Das 
> mailto:t...@databricks.com>> wrote:
> [Apologies for repost, for those who have seen this response already in the 
> dev mailing list]
>
> 1. When you set ssc.checkpoint(checkpointDir), the spark streaming 
> periodically saves the state RDD (which is a snapshot of all the state data) 
> to HDFS using RDD checkpointing. In fact, a streaming app with 
> updateStateByKey will not start until you set checkpoint directory.
>
> 2. The updateStateByKey performance is sort of independent of the what is the 
> source that is being use - receiver based or direct Kafka. The absolutely 
> performance obvious depends on a LOT of variables, size of the cluster, 
> parallelization, etc. The key things is that you must ensure sufficient 
> parallelization at every stage - receiving, shuffles (updateStateByKey 
> included), and output.
>
> Some more discussion in my talk - https://www.youtube.com/watch?v=d5UJonrruHk
>
>
>
> On Tue, Jul 14, 2015 at 4:13 PM, swetha 
> mailto:swethakasire...@gmail.com>> wrote:
>
> Hi,
>
> I have a question regarding sessionization using updateStateByKey. If near
> real time state needs to be maintained in a Streaming application, what
> happens when the number of RDDs to maintain the state becomes very large?
> Does it automatically get saved to HDFS and reload when needed or do I have
> to use any code like ssc.checkpoint(checkpointDir)?  Also, how is the
> performance if I use both DStream Checkpointing for maintaining the state
> and use Kafka Direct approach for exactly once semantics?
>
>
> Thanks,
> Swetha
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Sessionization-using-updateStateByKey-tp23838.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: 
> user-unsubscr...@spark.apache.org
> For additional commands, e-mail: 
> user-h...@spark.apache.org
>
>
>




Re: Unable to use dynamicAllocation if spark.executor.instances is set in spark-defaults.conf

2015-07-15 Thread Kelly, Jonathan
bump

From: Jonathan Kelly mailto:jonat...@amazon.com>>
Date: Tuesday, July 14, 2015 at 4:23 PM
To: "user@spark.apache.org" 
mailto:user@spark.apache.org>>
Subject: Unable to use dynamicAllocation if spark.executor.instances is set in 
spark-defaults.conf

I've set up my cluster with a pre-calcualted value for spark.executor.instances 
in spark-defaults.conf such that I can run a job and have it maximize the 
utilization of the cluster resources by default. However, if I want to run a 
job with dynamicAllocation (by passing -c spark.dynamicAllocation.enabled=true 
to spark-submit), I get this exception:

Exception in thread "main" java.lang.IllegalArgumentException: Explicitly 
setting the number of executors is not compatible with 
spark.dynamicAllocation.enabled!
at 
org.apache.spark.deploy.yarn.ClientArguments.parseArgs(ClientArguments.scala:192)
at org.apache.spark.deploy.yarn.ClientArguments.(ClientArguments.scala:59)
at 
org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:54)
...

The exception makes sense, of course, but ideally I would like it to ignore 
what I've put in spark-defaults.conf for spark.executor.instances if I've 
enabled dynamicAllocation. The most annoying thing about this is that if I have 
spark.executor.instances present in spark-defaults.conf, I cannot figure out 
any way to spark-submit a job with spark.dynamicAllocation.enabled=true without 
getting this error. That is, even if I pass "-c spark.executor.instances=0 -c 
spark.dynamicAllocation.enabled=true", I still get this error because the 
validation in ClientArguments.parseArgs() that's checking for this condition 
simply checks for the presence of spark.executor.instances rather than whether 
or not its value is > 0.

Should the check be changed to allow spark.executor.instances to be set to 0 if 
spark.dynamicAllocation.enabled is true? That would be an OK compromise, but 
I'd really prefer to be able to enable dynamicAllocation simply by setting 
spark.dynamicAllocation.enabled=true rather than by also having to set 
spark.executor.instances to 0.

Thanks,
Jonathan


Re: Sessionization using updateStateByKey

2015-07-15 Thread Sean McNamara
I would just like to add that we do the very same/similar thing at Webtrends, 
updateStateByKey has been a life-saver for our sessionization use-cases.

Cheers,

Sean


On Jul 15, 2015, at 11:20 AM, Silvio Fiorito 
mailto:silvio.fior...@granturing.com>> wrote:

Hi Cody,

I’ve had success using updateStateByKey for real-time sessionization by aging 
off timed-out sessions (returning None in the update function). This was on a 
large commercial website with millions of hits per day. This was over a year 
ago so I don’t have access to the stats any longer for length of sessions 
unfortunately, but I seem to remember they were around 10-30 minutes long. Even 
with peaks in volume, Spark managed to keep up very well.

Thanks,
Silvio

From: Cody Koeninger
Date: Wednesday, July 15, 2015 at 5:38 PM
To: algermissen1971
Cc: Tathagata Das, swetha, user
Subject: Re: Sessionization using updateStateByKey

An in-memory hash key data structure of some kind so that you're close to 
linear on the number of items in a batch, not the number of outstanding keys.  
That's more complex, because you have to deal with expiration for keys that 
never get hit, and for unusually long sessions you have to either drop them or 
hit durable storage.

Maybe someone has a better idea, I'd like to hear it.

On Wed, Jul 15, 2015 at 8:54 AM, algermissen1971 
mailto:algermissen1...@icloud.com>> wrote:
Hi Cody,

oh ... I though that was one of *the* use cases for it. Do you have a 
suggestion / best practice how to achieve the same thing with better scaling 
characteristics?

Jan

On 15 Jul 2015, at 15:33, Cody Koeninger 
mailto:c...@koeninger.org>> wrote:

> I personally would try to avoid updateStateByKey for sessionization when you 
> have long sessions / a lot of keys, because it's linear on the number of keys.
>
> On Tue, Jul 14, 2015 at 6:25 PM, Tathagata Das 
> mailto:t...@databricks.com>> wrote:
> [Apologies for repost, for those who have seen this response already in the 
> dev mailing list]
>
> 1. When you set ssc.checkpoint(checkpointDir), the spark streaming 
> periodically saves the state RDD (which is a snapshot of all the state data) 
> to HDFS using RDD checkpointing. In fact, a streaming app with 
> updateStateByKey will not start until you set checkpoint directory.
>
> 2. The updateStateByKey performance is sort of independent of the what is the 
> source that is being use - receiver based or direct Kafka. The absolutely 
> performance obvious depends on a LOT of variables, size of the cluster, 
> parallelization, etc. The key things is that you must ensure sufficient 
> parallelization at every stage - receiving, shuffles (updateStateByKey 
> included), and output.
>
> Some more discussion in my talk - https://www.youtube.com/watch?v=d5UJonrruHk
>
>
>
> On Tue, Jul 14, 2015 at 4:13 PM, swetha 
> mailto:swethakasire...@gmail.com>> wrote:
>
> Hi,
>
> I have a question regarding sessionization using updateStateByKey. If near
> real time state needs to be maintained in a Streaming application, what
> happens when the number of RDDs to maintain the state becomes very large?
> Does it automatically get saved to HDFS and reload when needed or do I have
> to use any code like ssc.checkpoint(checkpointDir)?  Also, how is the
> performance if I use both DStream Checkpointing for maintaining the state
> and use Kafka Direct approach for exactly once semantics?
>
>
> Thanks,
> Swetha
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Sessionization-using-updateStateByKey-tp23838.html
> Sent from the Apache Spark User List mailing list archive at 
> Nabble.com.
>
> -
> To unsubscribe, e-mail: 
> user-unsubscr...@spark.apache.org
> For additional commands, e-mail: 
> user-h...@spark.apache.org
>
>
>





Re: Sessionization using updateStateByKey

2015-07-15 Thread Cody Koeninger
Don't get me wrong, we've been able to use updateStateByKey for some jobs,
and it's certainly convenient.  At a certain point though, iterating
through every key on every batch is a less viable solution.

On Wed, Jul 15, 2015 at 12:38 PM, Sean McNamara  wrote:

>  I would just like to add that we do the very same/similar thing
> at Webtrends, updateStateByKey has been a life-saver for our sessionization
> use-cases.
>
>  Cheers,
>
>  Sean
>
>
>  On Jul 15, 2015, at 11:20 AM, Silvio Fiorito <
> silvio.fior...@granturing.com> wrote:
>
>   Hi Cody,
>
>  I’ve had success using updateStateByKey for real-time sessionization by
> aging off timed-out sessions (returning None in the update function). This
> was on a large commercial website with millions of hits per day. This was
> over a year ago so I don’t have access to the stats any longer for length
> of sessions unfortunately, but I seem to remember they were around 10-30
> minutes long. Even with peaks in volume, Spark managed to keep up very well.
>
>  Thanks,
> Silvio
>
>   From: Cody Koeninger
> Date: Wednesday, July 15, 2015 at 5:38 PM
> To: algermissen1971
> Cc: Tathagata Das, swetha, user
> Subject: Re: Sessionization using updateStateByKey
>
>   An in-memory hash key data structure of some kind so that you're close
> to linear on the number of items in a batch, not the number of outstanding
> keys.  That's more complex, because you have to deal with expiration for
> keys that never get hit, and for unusually long sessions you have to either
> drop them or hit durable storage.
>
>  Maybe someone has a better idea, I'd like to hear it.
>
> On Wed, Jul 15, 2015 at 8:54 AM, algermissen1971 <
> algermissen1...@icloud.com> wrote:
>
>> Hi Cody,
>>
>> oh ... I though that was one of *the* use cases for it. Do you have a
>> suggestion / best practice how to achieve the same thing with better
>> scaling characteristics?
>>
>> Jan
>>
>> On 15 Jul 2015, at 15:33, Cody Koeninger  wrote:
>>
>> > I personally would try to avoid updateStateByKey for sessionization
>> when you have long sessions / a lot of keys, because it's linear on the
>> number of keys.
>> >
>> > On Tue, Jul 14, 2015 at 6:25 PM, Tathagata Das 
>> wrote:
>> > [Apologies for repost, for those who have seen this response already in
>> the dev mailing list]
>> >
>> > 1. When you set ssc.checkpoint(checkpointDir), the spark streaming
>> periodically saves the state RDD (which is a snapshot of all the state
>> data) to HDFS using RDD checkpointing. In fact, a streaming app with
>> updateStateByKey will not start until you set checkpoint directory.
>> >
>> > 2. The updateStateByKey performance is sort of independent of the what
>> is the source that is being use - receiver based or direct Kafka. The
>> absolutely performance obvious depends on a LOT of variables, size of the
>> cluster, parallelization, etc. The key things is that you must ensure
>> sufficient parallelization at every stage - receiving, shuffles
>> (updateStateByKey included), and output.
>> >
>> > Some more discussion in my talk -
>> https://www.youtube.com/watch?v=d5UJonrruHk
>> >
>> >
>> >
>> > On Tue, Jul 14, 2015 at 4:13 PM, swetha 
>> wrote:
>> >
>> > Hi,
>> >
>> > I have a question regarding sessionization using updateStateByKey. If
>> near
>> > real time state needs to be maintained in a Streaming application, what
>> > happens when the number of RDDs to maintain the state becomes very
>> large?
>> > Does it automatically get saved to HDFS and reload when needed or do I
>> have
>> > to use any code like ssc.checkpoint(checkpointDir)?  Also, how is the
>> > performance if I use both DStream Checkpointing for maintaining the
>> state
>> > and use Kafka Direct approach for exactly once semantics?
>> >
>> >
>> > Thanks,
>> > Swetha
>> >
>> >
>> >
>> > --
>> > View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Sessionization-using-updateStateByKey-tp23838.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com
>> .
>> >
>> > -
>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> > For additional commands, e-mail: user-h...@spark.apache.org
>> >
>> >
>> >
>>
>>
>
>


Re: Sessionization using updateStateByKey

2015-07-15 Thread algermissen1971

On 15 Jul 2015, at 17:38, Cody Koeninger  wrote:

> An in-memory hash key data structure of some kind so that you're close to 
> linear on the number of items in a batch, not the number of outstanding keys. 
>  That's more complex, because you have to deal with expiration for keys that 
> never get hit, and for unusually long sessions you have to either drop them 
> or hit durable storage.

Thanks, yes. I do the expiration check already to terminate 'active' sessions 
and flush them to durable storage afterwards.

Excuse my Newbie-State: when docing this with my own data structure (e.g. such 
a hash), where should I execute the code that periodically checks the hash? 
Right now I am doing that in updateStateByKey - should I rather use foreachRDD?

And: if I understand you correctly, you are saying that updateStateByKey is 
more suitable for e.g. updating 'entities' of which a limited number exists 
(the users of the visits or the products sold). Yes?

Jan


> 
> Maybe someone has a better idea, I'd like to hear it.
> 
> On Wed, Jul 15, 2015 at 8:54 AM, algermissen1971  
> wrote:
> Hi Cody,
> 
> oh ... I though that was one of *the* use cases for it. Do you have a 
> suggestion / best practice how to achieve the same thing with better scaling 
> characteristics?
> 
> Jan
> 
> On 15 Jul 2015, at 15:33, Cody Koeninger  wrote:
> 
>> I personally would try to avoid updateStateByKey for sessionization when you 
>> have long sessions / a lot of keys, because it's linear on the number of 
>> keys.
>> 
>> On Tue, Jul 14, 2015 at 6:25 PM, Tathagata Das  wrote:
>> [Apologies for repost, for those who have seen this response already in the 
>> dev mailing list]
>> 
>> 1. When you set ssc.checkpoint(checkpointDir), the spark streaming 
>> periodically saves the state RDD (which is a snapshot of all the state data) 
>> to HDFS using RDD checkpointing. In fact, a streaming app with 
>> updateStateByKey will not start until you set checkpoint directory.
>> 
>> 2. The updateStateByKey performance is sort of independent of the what is 
>> the source that is being use - receiver based or direct Kafka. The 
>> absolutely performance obvious depends on a LOT of variables, size of the 
>> cluster, parallelization, etc. The key things is that you must ensure 
>> sufficient parallelization at every stage - receiving, shuffles 
>> (updateStateByKey included), and output.
>> 
>> Some more discussion in my talk - https://www.youtube.com/watch?v=d5UJonrruHk
>> 
>> 
>> 
>> On Tue, Jul 14, 2015 at 4:13 PM, swetha  wrote:
>> 
>> Hi,
>> 
>> I have a question regarding sessionization using updateStateByKey. If near
>> real time state needs to be maintained in a Streaming application, what
>> happens when the number of RDDs to maintain the state becomes very large?
>> Does it automatically get saved to HDFS and reload when needed or do I have
>> to use any code like ssc.checkpoint(checkpointDir)?  Also, how is the
>> performance if I use both DStream Checkpointing for maintaining the state
>> and use Kafka Direct approach for exactly once semantics?
>> 
>> 
>> Thanks,
>> Swetha
>> 
>> 
>> 
>> --
>> View this message in context: 
>> http://apache-spark-user-list.1001560.n3.nabble.com/Sessionization-using-updateStateByKey-tp23838.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> 
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>> 
>> 
>> 
> 
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: fileStream with old files

2015-07-15 Thread Hunter Morgan
After moving the setting of the parameter to SparkConf initialization instead 
of after the context is already initialized, I have it operating reliably on 
local filesystem, but not on hdfs. Are there any differences in behavior 
between these two cases I should be aware of?

I don’t usually mailinglist or exchange, so forgive me for my ignorance of 
whether this message will go horribly wrong due to formatting.

I plan to port the following code to Hadoop FS API to generalize testing to 
understand actual behavior and ensure desired behavior.

public static JavaDStream 
textFileStreamIncludingExisting(JavaStreamingContext context, String path)
{
return context.fileStream(path, LongWritable
.class, Text.class, TextInputFormat.class, v1 -> true, 
false).map(v1 -> v1._2.toString());
}

@Test
public void testTextFileStreamIncludingExistingReadsOldFiles() throws Exception
{
final Path testDir = Files.createTempDirectory("sparkTest");
final ArrayList tempFiles = new ArrayList();

// create 20 "old" files
final int testFileNumberLimit = 20;
for (int testFileNumber = 0; testFileNumber < testFileNumberLimit; 
testFileNumber++)
{
final Path testFile = Files.createTempFile(testDir, "testFile", "");
tempFiles.add(testFile);
final FileWriter fileWriter = new FileWriter(testFile.toFile());
fileWriter.write("asdf");
fileWriter.flush();
fileWriter.close();
for (String eachAttribute : new String[]{"basic:lastAccessTime", 
"basic:lastModifiedTime",
"basic:creationTime"})
{ // set file dates 0 to 20 days ago
Files.setAttribute(testFile, eachAttribute, 
FileTime.from(Instant.now().minus(Duration.ofDays
(testFileNumber;
}
}

final SparkConf sparkConf = new 
SparkConf().setMaster("local[1]").setAppName("test");
sparkConf.set("spark.streaming.minRememberDuration", 
String.valueOf(Integer.MAX_VALUE));
final JavaStreamingContext context = new JavaStreamingContext(sparkConf, 
Durations.seconds(1));
final JavaDStream input = 
SparkUtil.textFileStreamIncludingExisting(context, String.valueOf(testDir
.toUri()));
// count files read
final Accumulator accumulator = 
context.sparkContext().accumulator(0);

// setup async wait
Semaphore done = new Semaphore(1);
done.acquire();
input.foreachRDD(new Function, Void>()
{
@Override
public Void call(JavaRDD v1) throws Exception
{
if (v1.count() == 0)
{
done.release();
}
accumulator.add((int) v1.count());
return null;
}
});
context.start();
// wait for completion or 20 sec
done.tryAcquire(20, TimeUnit.SECONDS);
context.stop();

assertThat(accumulator.value(), is(testFileNumberLimit));

for (Path eachTempFile : tempFiles)
{
Files.deleteIfExists(eachTempFile);
}
Files.deleteIfExists(testDir);
}


From: Tathagata Das [mailto:t...@databricks.com]
Sent: Wednesday, July 15, 2015 00:01
To: Terry Hole
Cc: Hunter Morgan; user@spark.apache.org
Subject: Re: fileStream with old files

It was added, but its not documented publicly. I am planning to change the name 
of the conf to spark.streaming.fileStream.minRememberDuration to make it easier 
to understand

On Mon, Jul 13, 2015 at 9:43 PM, Terry Hole 
mailto:hujie.ea...@gmail.com>> wrote:
A new configuration named spark.streaming.minRememberDuration was added since 
1.2.1 to control the file stream input, the default value is 60 seconds, you 
can change this value to a large value to include older files (older than 1 
minute)

You can get the detail from this jira: 
https://issues.apache.org/jira/browse/SPARK-3276

-Terry

On Tue, Jul 14, 2015 at 4:44 AM, automaticgiant 
mailto:hunter.mor...@rackspace.com>> wrote:
It's not as odd as it sounds. I want to ensure that long streaming job
outages can recover all the files that went into a directory while the job
was down.
I've looked at
http://apache-spark-user-list.1001560.n3.nabble.com/Generating-a-DStream-by-existing-textfiles-td20030.html#a20039
and
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-td14306.html#a16435
and
https://stackoverflow.com/questions/29022379/spark-streaming-hdfs/29036469#29036469?newreg=e7e25469132d4fbc8350be8f876cf81e
, but all seem unhelpful.
I've tested combinations of the following:
 * fileStreams created with dumb accept-all filters
 * newFilesOnly true and false,
 * tweaking minRememberDuration to high and low values,
 * on hdfs or local directory.
The problem is that it will not read files in the directory from more than a
minute ago.
JavaPairInputDStream input = context.fileStream(indir,
LongWritable.class, Text.class, TextInputFormat.class, v -> true, false);
Also tried with having set:
context.sparkContext().getConf().set("spark.streaming.minRememberDuration",
"1654564"); to big/s

Re: Efficiency of leftOuterJoin a cassandra rdd

2015-07-15 Thread Wush Wu
Dear Sujit,

Thanks for your suggestion.

After testing, the `joinWithCassandraTable` does the trick like what
you mentioned.

The rdd2 only query those data which have the same key in rdd1.

Best,
Wush

2015-07-16 0:00 GMT+08:00 Sujit Pal :
> Hi Wush,
>
> One option may be to try a replicated join. Since your rdd1 is small, read
> it into a collection and broadcast it to the workers, then filter your
> larger rdd2 against the collection on the workers.
>
> -sujit
>
>
> On Tue, Jul 14, 2015 at 11:33 PM, Deepak Jain  wrote:
>>
>> Leftouterjoin and join apis are super slow in spark. 100x slower than
>> hadoop
>>
>> Sent from my iPhone
>>
>> > On 14-Jul-2015, at 10:59 PM, Wush Wu  wrote:
>> >
>> > I don't understand.
>> >
>> > By the way, the `joinWithCassandraTable` does improve my query time
>> > from 40 mins to 3 mins.
>> >
>> >
>> > 2015-07-15 13:19 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) :
>> >> I have explored spark joins for last few months (you can search my
>> >> posts)
>> >> and its frustrating useless.
>> >>
>> >>> On Tue, Jul 14, 2015 at 9:35 PM, Wush Wu  wrote:
>> >>>
>> >>> Dear all,
>> >>>
>> >>> I have found a post discussing the same thing:
>> >>>
>> >>>
>> >>> https://groups.google.com/a/lists.datastax.com/forum/#!searchin/spark-connector-user/join/spark-connector-user/q3GotS-n0Wk/g-LPTteCEg0J
>> >>>
>> >>> The solution is using "joinWithCassandraTable" and the documentation
>> >>> is here:
>> >>>
>> >>> https://github.com/datastax/spark-cassandra-connector/blob/v1.3.0-M2/doc/2_loading.md
>> >>>
>> >>> Wush
>> >>>
>> >>> 2015-07-15 12:15 GMT+08:00 Wush Wu :
>>  Dear all,
>> 
>>  I am trying to join two RDDs, named rdd1 and rdd2.
>> 
>>  rdd1 is loaded from a textfile with about 33000 records.
>> 
>>  rdd2 is loaded from a table in cassandra which has about 3 billions
>>  records.
>> 
>>  I tried the following code:
>> 
>>  ```scala
>> 
>>  val rdd1 : (String, XXX) = sc.textFile(...).map(...)
>>  import org.apache.spark.sql.cassandra.CassandraSQLContext
>>  cc.setKeyspace("xxx")
>>  val rdd2 : (String, String) = cc.sql("SELECT x, y FROM xxx").map(r =>
>>  ...)
>> 
>>  val result = rdd1.leftOuterJoin(rdd2)
>>  result.take(20)
>> 
>>  ```
>> 
>>  However, the log shows that the spark loaded 3 billions records from
>>  cassandra and only 33000 records left at the end.
>> 
>>  Is there a way to query the cassandra based on the key in rdd1?
>> 
>>  Here is some information of our system:
>> 
>>  - The spark version is 1.3.1
>>  - The cassandra version is 2.0.14
>>  - The key of joining is the primary key of the cassandra table.
>> 
>>  Best,
>>  Wush
>> >>>
>> >>> -
>> >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> >>> For additional commands, e-mail: user-h...@spark.apache.org
>> >>
>> >>
>> >>
>> >> --
>> >> Deepak
>> >>
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Info from the event timeline appears to contradict dstat info

2015-07-15 Thread Tom Hubregtsen
I am trying to analyze my program, in particular to see what the bottleneck
is (IO, CPU, network), and started using the event timeline for this. 

When looking at my Job 0, Stage 0 (the sampler function taking up 5.6
minutes of my 40 minute program), I see in the even timeline that all time
is spend in "Executor Computing Time." I am not quite sure what this means.
I first thought that because of this metric, I could immediately assume that
I was CPU bound, but this does not line up with my dstat log. When looking
at dstat, I see that I spend 65% in CPU wait, 17% in CPU system and only 18%
in CPU user, together with disk IO being fully utilized for the entire
duration of the stage. From this data, I would assume I am actually disk
bound.

My question based on this is: How do I interpreted the label "Executor
Computing Time," and what conclusions can I make from it? 
As I do not see read input/write output as one of the 7 labels, is IO meant
to be part of the "Executor Computing Time" (even though shuffle IO seems to
be separate)? Can I use information from event timeline as a basis for any
conclusions on my bottleneck (IO, CPU or network)? Is network included in
any of these 7 labels?

Thanks in advance,

Tom Hubregtsen




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Info-from-the-event-timeline-appears-to-contradict-dstat-info-tp23862.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 1.4.0 compute-classpath.sh

2015-07-15 Thread Marcelo Vanzin
That has never been the correct way to set you app's classpath.

Instead, look at http://spark.apache.org/docs/latest/configuration.html and
search for "extraClassPath".

On Wed, Jul 15, 2015 at 9:43 AM, lokeshkumar  wrote:

> Hi forum
>
> I have downloaded the latest spark version 1.4.0 and started using it.
> But I couldn't find the compute-classpath.sh file in bin/ which I am using
> in previous versions to provide third party libraries to my application.
>
> Can anyone please let me know where I can provide CLASSPATH with my third
> party libs in 1.4.0?
>
> Thanks
> Lokesh
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-4-0-compute-classpath-sh-tp23860.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Marcelo


Re: Research ideas using spark

2015-07-15 Thread Ravindra
Look at this :
http://www.forbes.com/sites/lisabrownlee/2015/07/10/the-11-trillion-internet-of-things-big-data-and-pattern-of-life-pol-analytics/

On Wed, Jul 15, 2015 at 10:19 PM shahid ashraf  wrote:

> Sorry Guys!
>
> I mistakenly added my question to this thread( Research ideas using
> spark). Moreover people can ask any question , this spark user group is for
> that.
>
> Cheers!
> 😊
>
> On Wed, Jul 15, 2015 at 9:43 PM, Robin East 
> wrote:
>
>> Well said Will. I would add that you might want to investigate GraphChi
>> which claims to be able to run a number of large-scale graph processing
>> tasks on a workstation much quicker than a very large Hadoop cluster. It
>> would be interesting to know how widely applicable the approach GraphChi
>> takes and what implications it has for parallel/distributed computing
>> approaches. A rich seam to mine indeed.
>>
>> Robin
>>
>> On 15 Jul 2015, at 14:48, William Temperley 
>> wrote:
>>
>> There seems to be a bit of confusion here - the OP (doing the PhD) had
>> the thread hijacked by someone with a similar name asking a mundane
>> question.
>>
>> It would be a shame to send someone away so rudely, who may do valuable
>> work on Spark.
>>
>> Sashidar (not Sashid!) I'm personally interested in running graph
>> algorithms for image segmentation using MLib and Spark.  I've got many
>> questions though - like is it even going to give me a speed-up?  (
>> http://www.frankmcsherry.org/graph/scalability/cost/2015/01/15/COST.html)
>>
>> It's not obvious to me which classes of graph algorithms can be
>> implemented correctly and efficiently in a highly parallel manner.  There's
>> tons of work to be done here, I'm sure. Also, look at parallel geospatial
>> algorithms - there's a lot of work being done on this.
>>
>> Best, Will
>>
>>
>>
>> On 15 July 2015 at 09:01, Vineel Yalamarthy 
>> wrote:
>>
>>> Hi Daniel
>>>
>>> Well said
>>>
>>> Regards
>>> Vineel
>>>
>>> On Tue, Jul 14, 2015, 6:11 AM Daniel Darabos <
>>> daniel.dara...@lynxanalytics.com> wrote:
>>>
 Hi Shahid,
 To be honest I think this question is better suited for Stack Overflow
 than for a PhD thesis.

 On Tue, Jul 14, 2015 at 7:42 AM, shahid ashraf 
 wrote:

> hi
>
> I have a 10 node cluster  i loaded the data onto hdfs, so the no. of
> partitions i get is 9. I am running a spark application , it gets stuck on
> one of tasks, looking at the UI it seems application is not using all 
> nodes
> to do calculations. attached is the screen shot of tasks, it seems tasks
> are put on each node more then once. looking at tasks 8 tasks get 
> completed
> under 7-8 minutes and one task takes around 30 minutes so causing the 
> delay
> in results.
>
>
> On Tue, Jul 14, 2015 at 10:48 AM, Shashidhar Rao <
> raoshashidhar...@gmail.com> wrote:
>
>> Hi,
>>
>> I am doing my PHD thesis on large scale machine learning e.g  Online
>> learning, batch and mini batch learning.
>>
>> Could somebody help me with ideas especially in the context of Spark
>> and to the above learning methods.
>>
>> Some ideas like improvement to existing algorithms, implementing new
>> features especially the above learning methods and algorithms that have 
>> not
>> been implemented etc.
>>
>> If somebody could help me with some ideas it would really accelerate
>> my work.
>>
>> Plus few ideas on research papers regarding Spark or Mahout.
>>
>> Thanks in advance.
>>
>> Regards
>>
>
>
>
> --
> with Regards
> Shahid Ashraf
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>


>>
>>
>
>
> --
> with Regards
> Shahid Ashraf
>


spark streaming job to hbase write

2015-07-15 Thread Shushant Arora
Hi

I have a requirement of writing in hbase table from Spark streaming app
after some processing.
Is Hbase put operation the only way of writing to hbase or is there any
specialised connector or rdd of spark for hbase write.

Should Bulk load to hbase from streaming  app be avoided if output of each
batch interval is just few mbs?

Thanks


Re: Strange Error: "java.lang.OutOfMemoryError: GC overhead limit exceeded"

2015-07-15 Thread Saeed Shahrivari
Yes there is.
But the RDD is more than 10 TB and compression does not help.

On Wed, Jul 15, 2015 at 8:36 PM, Ted Yu  wrote:

> bq. serializeUncompressed()
>
> Is there a method which enables compression ?
>
> Just wondering if that would reduce the memory footprint.
>
> Cheers
>
> On Wed, Jul 15, 2015 at 8:06 AM, Saeed Shahrivari <
> saeed.shahriv...@gmail.com> wrote:
>
>> I use a simple map/reduce step in a Java/Spark program to remove
>> duplicated documents from a large (10 TB compressed) sequence file
>> containing some html pages. Here is the partial code:
>>
>> JavaPairRDD inputRecords =
>> sc.sequenceFile(args[0], BytesWritable.class, 
>> NullWritable.class).coalesce(numMaps);
>>
>>
>> JavaPairRDD hashDocs = 
>> inputRecords.mapToPair(t ->
>> cacheDocs.add(new Tuple2<>(BaseEncoding.base64()
>> .encode(Hashing.sha1().hashString(doc.getUrl(), 
>> Charset.defaultCharset()).asBytes()), doc));
>> });
>>
>>
>> JavaPairRDD byteArrays =
>> hashDocs.reduceByKey((a, b) -> a.getUrl() < b.getUrl() ? a : b, numReds).
>> mapToPair(t -> new Tuple2<>(new 
>> BytesWritable(PentV3.buildFromMessage(t._2).serializeUncompressed()),
>> NullWritable.get()));
>>
>>
>> The logic is simple. The map generates a sha-1 signature from the html and 
>> in the reduce phase we keep the html that has the shortest URL. However, 
>> after running for 2-3 hours the application crashes due to memory issue. 
>> Here is the exception:
>>
>> 15/07/15 18:24:05 WARN scheduler.TaskSetManager: Lost task 267.0 in stage 
>> 0.0 (TID 267, psh-11.nse.ir): java.lang.OutOfMemoryError: GC overhead limit 
>> exceeded
>>
>>
>> It seems that the map function keeps the hashDocs RDD in the memory and when 
>> the memory is filled in an executor, the application crashes. Persisting the 
>> map output to disk solves the problem. Adding the following line between map 
>> and reduce solve the issue:
>>
>> hashDocs.persist(StorageLevel.DISK_ONLY());
>>
>>
>> Is this a bug of Spark?
>>
>> How can I tell Spark not to keep even a bit of RDD in the memory?
>>
>>
>> Thanks
>>
>>
>>
>


Re: Research ideas using spark

2015-07-15 Thread Michael Segel
Silly question… 

When thinking about a PhD thesis… do you want to tie it to a specific 
technology or do you want to investigate an idea but then use a specific 
technology. 
Or is this an outdated way of thinking? 

"I am doing my PHD thesis on large scale machine learning e.g  Online learning, 
batch and mini batch learning.”

So before we look at technologies like Spark… could the OP break down a more 
specific concept or idea that he wants to pursue? 

Looking at what Jorn said… 

Using machine learning to better predict workloads in terms of managing 
clusters… This could be interesting… but is it enough for a PhD thesis, or of 
interest to the OP? 


> On Jul 15, 2015, at 9:43 AM, Jörn Franke  wrote:
> 
> Well one of the strength of spark is standardized general distributed 
> processing allowing many different types of processing, such as graph 
> processing, stream processing etc. The limitation is that it is less 
> performant than one system focusing only on one type of processing (eg graph 
> processing). I miss - and this may not be spark specific - some artificial 
> intelligence to manage a cluster, e.g. Predicting workloads, how long a job 
> may run based on previously executed similar jobs etc. Furthermore, many 
> optimizations you have do to manually, e.g. Bloom filters, partitioning etc - 
> if you find here as well some intelligence that does this automatically based 
> on previously executed jobs taking into account that optimizations themselves 
> change over time would be great... You may also explore feature interaction
> 
> Le mar. 14 juil. 2015 à 7:19, Shashidhar Rao  > a écrit :
> Hi,
> 
> I am doing my PHD thesis on large scale machine learning e.g  Online 
> learning, batch and mini batch learning.
> 
> Could somebody help me with ideas especially in the context of Spark and to 
> the above learning methods. 
> 
> Some ideas like improvement to existing algorithms, implementing new features 
> especially the above learning methods and algorithms that have not been 
> implemented etc.
> 
> If somebody could help me with some ideas it would really accelerate my work.
> 
> Plus few ideas on research papers regarding Spark or Mahout.
> 
> Thanks in advance.
> 
> Regards 




Re: Unable to use dynamicAllocation if spark.executor.instances is set in spark-defaults.conf

2015-07-15 Thread Kelly, Jonathan
Would there be any problem in having spark.executor.instances (or 
--num-executors) be completely ignored (i.e., even for non-zero values) if 
spark.dynamicAllocation.enabled is true (i.e., rather than throwing an 
exception)?

I can see how the exception would be helpful if, say, you tried to pass both 
"-c spark.executor.instances" (or --num-executors) *and* "-c 
spark.dynamicAllocation.enabled=true" to spark-submit on the command line (as 
opposed to having one of them in spark-defaults.conf and one of them in the 
spark-submit args), but currently there doesn't seem to be any way to 
distinguish between arguments that were actually passed to spark-submit and 
settings that simply came from spark-defaults.conf.

If there were a way to distinguish them, I think the ideal situation would be 
for the validation exception to be thrown only if spark.executor.instances and 
spark.dynamicAllocation.enabled=true were both passed via spark-submit args or 
were both present in spark-defaults.conf, but passing 
spark.dynamicAllocation.enabled=true to spark-submit would take precedence over 
spark.executor.instances configured in spark-defaults.conf, and vice versa.

Jonathan Kelly
Elastic MapReduce - SDE
Blackfoot (SEA33) 06.850.F0

From: Jonathan Kelly mailto:jonat...@amazon.com>>
Date: Tuesday, July 14, 2015 at 4:23 PM
To: "user@spark.apache.org" 
mailto:user@spark.apache.org>>
Subject: Unable to use dynamicAllocation if spark.executor.instances is set in 
spark-defaults.conf

I've set up my cluster with a pre-calcualted value for spark.executor.instances 
in spark-defaults.conf such that I can run a job and have it maximize the 
utilization of the cluster resources by default. However, if I want to run a 
job with dynamicAllocation (by passing -c spark.dynamicAllocation.enabled=true 
to spark-submit), I get this exception:

Exception in thread "main" java.lang.IllegalArgumentException: Explicitly 
setting the number of executors is not compatible with 
spark.dynamicAllocation.enabled!
at 
org.apache.spark.deploy.yarn.ClientArguments.parseArgs(ClientArguments.scala:192)
at org.apache.spark.deploy.yarn.ClientArguments.(ClientArguments.scala:59)
at 
org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:54)
...

The exception makes sense, of course, but ideally I would like it to ignore 
what I've put in spark-defaults.conf for spark.executor.instances if I've 
enabled dynamicAllocation. The most annoying thing about this is that if I have 
spark.executor.instances present in spark-defaults.conf, I cannot figure out 
any way to spark-submit a job with spark.dynamicAllocation.enabled=true without 
getting this error. That is, even if I pass "-c spark.executor.instances=0 -c 
spark.dynamicAllocation.enabled=true", I still get this error because the 
validation in ClientArguments.parseArgs() that's checking for this condition 
simply checks for the presence of spark.executor.instances rather than whether 
or not its value is > 0.

Should the check be changed to allow spark.executor.instances to be set to 0 if 
spark.dynamicAllocation.enabled is true? That would be an OK compromise, but 
I'd really prefer to be able to enable dynamicAllocation simply by setting 
spark.dynamicAllocation.enabled=true rather than by also having to set 
spark.executor.instances to 0.

Thanks,
Jonathan


Re: Spark Intro

2015-07-15 Thread vaquar khan
Totally agreed with hafasa, you need to identify your requirements and
needs before choose spark.

If you want to handle data with fast access go to no sql (mongo,aerospike
etc) if you need data analytical then spark is best .

Regards,
Vaquar khan
On 14 Jul 2015 20:39, "Hafsa Asif"  wrote:

> Hi,
> I was also in the same situation as we were using MySQL. Let me give some
> clearfications:
> 1. Spark provides a great methodology for big data analysis. So, if you
> want to make your system more analytical and want deep prepared analytical
> methods to analyze your data, then its a very good option.
> 2. If you want to get rid of old behavior of MS SQL and want to take fast
> responses from database with huge datasets then you can take any NOSQL
> database.
>
> In my case I select Aerospike for data storage and apply Spark analytical
> engine on it. It gives me really good response and I have a plan to go in
> real production with this combination.
>
> Best,
> Hafsa
>
> 2015-07-14 11:49 GMT+02:00 Akhil Das :
>
>> It might take some time to understand the echo system. I'm not sure about
>> what kind of environment you are having (like #cores, Memory etc.), To
>> start with, you can basically use a jdbc connector or dump your data as csv
>> and load it into Spark and query it. You get the advantage of caching if
>> you have more memory, also if you have enough cores 4 records are
>> nothing.
>>
>> Thanks
>> Best Regards
>>
>> On Tue, Jul 14, 2015 at 3:09 PM, vinod kumar 
>> wrote:
>>
>>> Hi Akhil
>>>
>>> Is my choice to switch to spark is good? because I don't have enough
>>> information regards limitation and working environment of spark.
>>> I tried spark SQL but it seems it returns data slower than compared to
>>> MsSQL.( I have tested with data which has 4 records)
>>>
>>>
>>>
>>> On Tue, Jul 14, 2015 at 3:50 AM, Akhil Das 
>>> wrote:
>>>
 This is where you can get started
 https://spark.apache.org/docs/latest/sql-programming-guide.html

 Thanks
 Best Regards

 On Mon, Jul 13, 2015 at 3:54 PM, vinod kumar 
 wrote:

>
> Hi Everyone,
>
> I am developing application which handles bulk of data around
> millions(This may vary as per user's requirement) records.As of now I am
> using MsSqlServer as back-end and it works fine  but when I perform some
> operation on large data I am getting overflow exceptions.I heard about
> spark that it was fastest computation engine Than SQL(Correct me if I am
> worng).so i thought to switch my application to spark.Is my decision is
> right?
> My User Enviroment is
> #.Window 8
> #.Data in millions.
> #.Need to perform filtering and Sorting operations with aggregartions
> frequently.(for analystics)
>
> Thanks in-advance,
>
> Vinod
>


>>>
>>
>


Re: Java 8 vs Scala

2015-07-15 Thread vaquar khan
My choice is java 8
On 15 Jul 2015 18:03, "Alan Burlison"  wrote:

> On 15/07/2015 08:31, Ignacio Blasco wrote:
>
>  The main advantage of using scala vs java 8 is being able to use a console
>>
>
> https://bugs.openjdk.java.net/browse/JDK-8043364
>
> --
> Alan Burlison
> --
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Research ideas using spark

2015-07-15 Thread vaquar khan
I would suggest study spark ,flink,strom and based on your understanding
and finding prepare your research paper.

May be you will invented new spark ☺

Regards,
Vaquar khan
On 16 Jul 2015 00:47, "Michael Segel"  wrote:

> Silly question…
>
> When thinking about a PhD thesis… do you want to tie it to a specific
> technology or do you want to investigate an idea but then use a specific
> technology.
> Or is this an outdated way of thinking?
>
> "I am doing my PHD thesis on large scale machine learning e.g  Online
> learning, batch and mini batch learning.”
>
> So before we look at technologies like Spark… could the OP break down a
> more specific concept or idea that he wants to pursue?
>
> Looking at what Jorn said…
>
> Using machine learning to better predict workloads in terms of managing
> clusters… This could be interesting… but is it enough for a PhD thesis, or
> of interest to the OP?
>
>
> On Jul 15, 2015, at 9:43 AM, Jörn Franke  wrote:
>
> Well one of the strength of spark is standardized general distributed
> processing allowing many different types of processing, such as graph
> processing, stream processing etc. The limitation is that it is less
> performant than one system focusing only on one type of processing (eg
> graph processing). I miss - and this may not be spark specific - some
> artificial intelligence to manage a cluster, e.g. Predicting workloads, how
> long a job may run based on previously executed similar jobs etc.
> Furthermore, many optimizations you have do to manually, e.g. Bloom
> filters, partitioning etc - if you find here as well some intelligence that
> does this automatically based on previously executed jobs taking into
> account that optimizations themselves change over time would be great...
> You may also explore feature interaction
>
> Le mar. 14 juil. 2015 à 7:19, Shashidhar Rao 
> a écrit :
>
>> Hi,
>>
>> I am doing my PHD thesis on large scale machine learning e.g  Online
>> learning, batch and mini batch learning.
>>
>> Could somebody help me with ideas especially in the context of Spark and
>> to the above learning methods.
>>
>> Some ideas like improvement to existing algorithms, implementing new
>> features especially the above learning methods and algorithms that have not
>> been implemented etc.
>>
>> If somebody could help me with some ideas it would really accelerate my
>> work.
>>
>> Plus few ideas on research papers regarding Spark or Mahout.
>>
>> Thanks in advance.
>>
>> Regards
>>
>
>
>


small accumulator gives out of memory error

2015-07-15 Thread AlexG
When I call the following minimal working example, the accumulator matrix is
32-by-100K, and each executor has 64G but I get an out of memory error:

Exception in thread "main" java.lang.OutOfMemoryError: Requested array size
exceeds VM limit

Here BDM is a Breeze DenseMatrix

object BDMAccumulatorParam extends AccumulatorParam[BDM[Double]] {
  def zero(initialValue: BDM[Double]): BDM[Double] = {
BDM.zeros[Double](initialValue.rows, initialValue.cols)
  }

  def addInPlace(m1: BDM[Double], m2: BDM[Double]) : BDM[Double] = {
m1 += m2
  }
}

def testfun(mat: IndexedRowMatrix, lhs: DenseMatrix) : DenseMatrix = {

   val accum =
mat.rows.context.accumulator(BDM.zeros[Double](lhs.numRows.toInt,
mat.numCols.toInt))(BDMAccumulatorParam)
   mat.rows.foreach(row => accum += BDM.ones[Double](lhs.numRows.toInt,
mat.numCols.toInt))
   fromBreeze(accum.value)
}

Any ideas or suggestions on how to avoid this error?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/small-accumulator-gives-out-of-memory-error-tp23864.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



NotSerializableException in spark 1.4.0

2015-07-15 Thread Chen Song
The streaming job has been running ok in 1.2 and 1.3. After I upgraded to
1.4, I started seeing error as below. It appears that it fails in validate
method in StreamingContext. Is there anything changed on 1.4.0 w.r.t
DStream checkpointint?

Detailed error from driver:

15/07/15 18:00:39 ERROR yarn.ApplicationMaster: User class threw
exception: *java.io.NotSerializableException:
DStream checkpointing has been enabled but the DStreams with their
functions are not serializable*
Serialization stack:

java.io.NotSerializableException: DStream checkpointing has been enabled
but the DStreams with their functions are not serializable
Serialization stack:

at
org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:550)
at
org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:587)
at
org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586)

-- 
Chen Song


Re: Info from the event timeline appears to contradict dstat info

2015-07-15 Thread Tom Hubregtsen
I think I found my answer at https://github.com/kayousterhout/trace-analysis:

"One thing to keep in mind is that Spark does not currently include
instrumentation to measure the time spent reading input data from disk or
writing job output to disk (the `Output write wait'' shown in the waterfall
is time to write shuffle output to disk, which Spark does have
instrumentation for); as a result, the time shown asCompute' may include
time using the disk. We have a custom Hadoop branch that measures the time
Hadoop spends transferring data to/from disk, and we are hopeful that
similar timing metrics will someday be included in the Hadoop FileStatistics
API. In the meantime, it is not currently possible to understand how much of
a Spark task's time is spent reading from disk via HDFS."

That said, this might be posted as a footnote at the event timeline to avoid
confusion :)

Best regards,

Tom Hubregtsen



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Info-from-the-event-timeline-appears-to-contradict-dstat-info-tp23862p23865.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: NotSerializableException in spark 1.4.0

2015-07-15 Thread Ted Yu
Can you show us your function(s) ?

Thanks

On Wed, Jul 15, 2015 at 12:46 PM, Chen Song  wrote:

> The streaming job has been running ok in 1.2 and 1.3. After I upgraded to
> 1.4, I started seeing error as below. It appears that it fails in validate
> method in StreamingContext. Is there anything changed on 1.4.0 w.r.t
> DStream checkpointint?
>
> Detailed error from driver:
>
> 15/07/15 18:00:39 ERROR yarn.ApplicationMaster: User class threw
> exception: *java.io.NotSerializableException: DStream checkpointing has
> been enabled but the DStreams with their functions are not serializable*
> Serialization stack:
>
> java.io.NotSerializableException: DStream checkpointing has been enabled
> but the DStreams with their functions are not serializable
> Serialization stack:
>
> at
> org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:550)
> at
> org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:587)
> at
> org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586)
>
> --
> Chen Song
>
>


Re: Java 8 vs Scala

2015-07-15 Thread Ted Yu
jshell is nice but it is targeting Java 9

Cheers

On Wed, Jul 15, 2015 at 5:31 AM, Alan Burlison 
wrote:

> On 15/07/2015 08:31, Ignacio Blasco wrote:
>
>  The main advantage of using scala vs java 8 is being able to use a console
>>
>
> https://bugs.openjdk.java.net/browse/JDK-8043364
>
> --
> Alan Burlison
> --
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: spark streaming job to hbase write

2015-07-15 Thread Todd Nist
There are there connector packages listed on spark packages web site:

http://spark-packages.org/?q=hbase

HTH.

-Todd

On Wed, Jul 15, 2015 at 2:46 PM, Shushant Arora 
wrote:

> Hi
>
> I have a requirement of writing in hbase table from Spark streaming app
> after some processing.
> Is Hbase put operation the only way of writing to hbase or is there any
> specialised connector or rdd of spark for hbase write.
>
> Should Bulk load to hbase from streaming  app be avoided if output of each
> batch interval is just few mbs?
>
> Thanks
>
>


Re: NotSerializableException in spark 1.4.0

2015-07-15 Thread Tathagata Das
Your streaming job may have been seemingly running ok, but the DStream
checkpointing must have been failing in the background. It would have been
visible in the log4j logs. In 1.4.0, we enabled fast-failure for that so
that checkpointing failures dont get hidden in the background.

The fact that the serialization stack is not being shown correctly, is a
known bug in Spark 1.4.0, but is fixed in 1.4.1 about to come out in the
next couple of days. That should help you to narrow down the culprit
preventing serialization.

On Wed, Jul 15, 2015 at 1:12 PM, Ted Yu  wrote:

> Can you show us your function(s) ?
>
> Thanks
>
> On Wed, Jul 15, 2015 at 12:46 PM, Chen Song 
> wrote:
>
>> The streaming job has been running ok in 1.2 and 1.3. After I upgraded to
>> 1.4, I started seeing error as below. It appears that it fails in validate
>> method in StreamingContext. Is there anything changed on 1.4.0 w.r.t
>> DStream checkpointint?
>>
>> Detailed error from driver:
>>
>> 15/07/15 18:00:39 ERROR yarn.ApplicationMaster: User class threw
>> exception: *java.io.NotSerializableException: DStream checkpointing has
>> been enabled but the DStreams with their functions are not serializable*
>> Serialization stack:
>>
>> java.io.NotSerializableException: DStream checkpointing has been enabled
>> but the DStreams with their functions are not serializable
>> Serialization stack:
>>
>> at
>> org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:550)
>> at
>> org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:587)
>> at
>> org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586)
>>
>> --
>> Chen Song
>>
>>
>


ALS run method versus ALS train versus ALS fit and transform

2015-07-15 Thread Carol McDonald
In the Spark mllib examples MovieLensALS.scala  ALS run is used, however in
the movie recommendation with mllib tutorial ALS train is used , What is
the difference, when should you use one versus the other

val model = new ALS()
  .setRank(params.rank)
  .setIterations(params.numIterations)
  .setLambda(params.lambda)
  .setImplicitPrefs(params.implicitPrefs)
  .setUserBlocks(params.numUserBlocks)
  .setProductBlocks(params.numProductBlocks)
  .run(training)


  val model = ALS.train(training, rank, numIter, lambda)

Also in org.apache.spark.examples.ml  , fit and transform is used. Which
one do you recommend using ?

val als = new ALS()
  .setUserCol("userId")
  .setItemCol("movieId")
  .setRank(params.rank)
  .setMaxIter(params.maxIter)
  .setRegParam(params.regParam)
  .setNumBlocks(params.numBlocks)

val model = als.fit(training.toDF())

val predictions = model.transform(test.toDF()).cache()


Re: ALS run method versus ALS train versus ALS fit and transform

2015-07-15 Thread Sean Owen
The first two examples are from the .mllib API. Really, the "new
ALS()...run()" form is underneath both of the first two. In the second
case, you're calling a convenience method that calls something similar
to the first example.

The second example is from the new .ml "pipelines" API. Similar ideas,
but a different API.

On Wed, Jul 15, 2015 at 9:55 PM, Carol McDonald  wrote:
> In the Spark mllib examples MovieLensALS.scala  ALS run is used, however in
> the movie recommendation with mllib tutorial ALS train is used , What is the
> difference, when should you use one versus the other
>
> val model = new ALS()
>   .setRank(params.rank)
>   .setIterations(params.numIterations)
>   .setLambda(params.lambda)
>   .setImplicitPrefs(params.implicitPrefs)
>   .setUserBlocks(params.numUserBlocks)
>   .setProductBlocks(params.numProductBlocks)
>   .run(training)
>
>
>   val model = ALS.train(training, rank, numIter, lambda)
>
> Also in org.apache.spark.examples.ml  , fit and transform is used. Which one
> do you recommend using ?
>
> val als = new ALS()
>   .setUserCol("userId")
>   .setItemCol("movieId")
>   .setRank(params.rank)
>   .setMaxIter(params.maxIter)
>   .setRegParam(params.regParam)
>   .setNumBlocks(params.numBlocks)
>
> val model = als.fit(training.toDF())
>
> val predictions = model.transform(test.toDF()).cache()
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Unable to use dynamicAllocation if spark.executor.instances is set in spark-defaults.conf

2015-07-15 Thread Sandy Ryza
Hi Jonathan,

This is a problem that has come up for us as well, because we'd like
dynamic allocation to be turned on by default in some setups, but not break
existing users with these properties.  I'm hoping to figure out a way to
reconcile these by Spark 1.5.

-Sandy

On Wed, Jul 15, 2015 at 3:18 PM, Kelly, Jonathan 
wrote:

>   Would there be any problem in having spark.executor.instances (or
> --num-executors) be completely ignored (i.e., even for non-zero values) if
> spark.dynamicAllocation.enabled is true (i.e., rather than throwing an
> exception)?
>
>  I can see how the exception would be helpful if, say, you tried to pass
> both "-c spark.executor.instances" (or --num-executors) *and* "-c
> spark.dynamicAllocation.enabled=true" to spark-submit on the command line
> (as opposed to having one of them in spark-defaults.conf and one of them in
> the spark-submit args), but currently there doesn't seem to be any way to
> distinguish between arguments that were actually passed to spark-submit and
> settings that simply came from spark-defaults.conf.
>
>  If there were a way to distinguish them, I think the ideal situation
> would be for the validation exception to be thrown only if
> spark.executor.instances and spark.dynamicAllocation.enabled=true were both
> passed via spark-submit args or were both present in spark-defaults.conf,
> but passing spark.dynamicAllocation.enabled=true to spark-submit would take
> precedence over spark.executor.instances configured in spark-defaults.conf,
> and vice versa.
>
>
>  Jonathan Kelly
>
> Elastic MapReduce - SDE
>
> Blackfoot (SEA33) 06.850.F0
>
>   From: Jonathan Kelly 
> Date: Tuesday, July 14, 2015 at 4:23 PM
> To: "user@spark.apache.org" 
> Subject: Unable to use dynamicAllocation if spark.executor.instances is
> set in spark-defaults.conf
>
>   I've set up my cluster with a pre-calcualted value for
> spark.executor.instances in spark-defaults.conf such that I can run a job
> and have it maximize the utilization of the cluster resources by default.
> However, if I want to run a job with dynamicAllocation (by passing -c
> spark.dynamicAllocation.enabled=true to spark-submit), I get this exception:
>
>  Exception in thread "main" java.lang.IllegalArgumentException:
> Explicitly setting the number of executors is not compatible with
> spark.dynamicAllocation.enabled!
> at
> org.apache.spark.deploy.yarn.ClientArguments.parseArgs(ClientArguments.scala:192)
> at
> org.apache.spark.deploy.yarn.ClientArguments.(ClientArguments.scala:59)
> at
> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:54)
>  …
>
>  The exception makes sense, of course, but ideally I would like it to
> ignore what I've put in spark-defaults.conf for spark.executor.instances if
> I've enabled dynamicAllocation. The most annoying thing about this is that
> if I have spark.executor.instances present in spark-defaults.conf, I cannot
> figure out any way to spark-submit a job with
> spark.dynamicAllocation.enabled=true without getting this error. That is,
> even if I pass "-c spark.executor.instances=0 -c
> spark.dynamicAllocation.enabled=true", I still get this error because the
> validation in ClientArguments.parseArgs() that's checking for this
> condition simply checks for the presence of spark.executor.instances rather
> than whether or not its value is > 0.
>
>  Should the check be changed to allow spark.executor.instances to be set
> to 0 if spark.dynamicAllocation.enabled is true? That would be an OK
> compromise, but I'd really prefer to be able to enable dynamicAllocation
> simply by setting spark.dynamicAllocation.enabled=true rather than by also
> having to set spark.executor.instances to 0.
>
>
>  Thanks,
>
> Jonathan
>


Re: Unable to use dynamicAllocation if spark.executor.instances is set in spark-defaults.conf

2015-07-15 Thread Kelly, Jonathan
Thanks! Is there an existing JIRA I should watch?

~ Jonathan

From: Sandy Ryza mailto:sandy.r...@cloudera.com>>
Date: Wednesday, July 15, 2015 at 2:27 PM
To: Jonathan Kelly mailto:jonat...@amazon.com>>
Cc: "user@spark.apache.org" 
mailto:user@spark.apache.org>>
Subject: Re: Unable to use dynamicAllocation if spark.executor.instances is set 
in spark-defaults.conf

Hi Jonathan,

This is a problem that has come up for us as well, because we'd like dynamic 
allocation to be turned on by default in some setups, but not break existing 
users with these properties.  I'm hoping to figure out a way to reconcile these 
by Spark 1.5.

-Sandy

On Wed, Jul 15, 2015 at 3:18 PM, Kelly, Jonathan 
mailto:jonat...@amazon.com>> wrote:
Would there be any problem in having spark.executor.instances (or 
--num-executors) be completely ignored (i.e., even for non-zero values) if 
spark.dynamicAllocation.enabled is true (i.e., rather than throwing an 
exception)?

I can see how the exception would be helpful if, say, you tried to pass both 
"-c spark.executor.instances" (or --num-executors) *and* "-c 
spark.dynamicAllocation.enabled=true" to spark-submit on the command line (as 
opposed to having one of them in spark-defaults.conf and one of them in the 
spark-submit args), but currently there doesn't seem to be any way to 
distinguish between arguments that were actually passed to spark-submit and 
settings that simply came from spark-defaults.conf.

If there were a way to distinguish them, I think the ideal situation would be 
for the validation exception to be thrown only if spark.executor.instances and 
spark.dynamicAllocation.enabled=true were both passed via spark-submit args or 
were both present in spark-defaults.conf, but passing 
spark.dynamicAllocation.enabled=true to spark-submit would take precedence over 
spark.executor.instances configured in spark-defaults.conf, and vice versa.

Jonathan Kelly
Elastic MapReduce - SDE
Blackfoot (SEA33) 06.850.F0

From: Jonathan Kelly mailto:jonat...@amazon.com>>
Date: Tuesday, July 14, 2015 at 4:23 PM
To: "user@spark.apache.org" 
mailto:user@spark.apache.org>>
Subject: Unable to use dynamicAllocation if spark.executor.instances is set in 
spark-defaults.conf

I've set up my cluster with a pre-calcualted value for spark.executor.instances 
in spark-defaults.conf such that I can run a job and have it maximize the 
utilization of the cluster resources by default. However, if I want to run a 
job with dynamicAllocation (by passing -c spark.dynamicAllocation.enabled=true 
to spark-submit), I get this exception:

Exception in thread "main" java.lang.IllegalArgumentException: Explicitly 
setting the number of executors is not compatible with 
spark.dynamicAllocation.enabled!
at 
org.apache.spark.deploy.yarn.ClientArguments.parseArgs(ClientArguments.scala:192)
at org.apache.spark.deploy.yarn.ClientArguments.(ClientArguments.scala:59)
at 
org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:54)
…

The exception makes sense, of course, but ideally I would like it to ignore 
what I've put in spark-defaults.conf for spark.executor.instances if I've 
enabled dynamicAllocation. The most annoying thing about this is that if I have 
spark.executor.instances present in spark-defaults.conf, I cannot figure out 
any way to spark-submit a job with spark.dynamicAllocation.enabled=true without 
getting this error. That is, even if I pass "-c spark.executor.instances=0 -c 
spark.dynamicAllocation.enabled=true", I still get this error because the 
validation in ClientArguments.parseArgs() that's checking for this condition 
simply checks for the presence of spark.executor.instances rather than whether 
or not its value is > 0.

Should the check be changed to allow spark.executor.instances to be set to 0 if 
spark.dynamicAllocation.enabled is true? That would be an OK compromise, but 
I'd really prefer to be able to enable dynamicAllocation simply by setting 
spark.dynamicAllocation.enabled=true rather than by also having to set 
spark.executor.instances to 0.

Thanks,
Jonathan



RE: Any beginner samples for using ML / MLIB to produce a moving average of a (K, iterable[V])

2015-07-15 Thread Mohammed Guller
I could be wrong, but it looks like the only implementation available right now 
is MultivariateOnlineSummarizer.

Mohammed

From: Nkechi Achara [mailto:nkach...@googlemail.com]
Sent: Wednesday, July 15, 2015 4:31 AM
To: user@spark.apache.org
Subject: Any beginner samples for using ML / MLIB to produce a moving average 
of a (K, iterable[V])

Hi all,

I am trying to get some summary statistics to retrieve the moving average for 
several devices that have an array or latency in seconds in this kind of format:

deviceLatencyMap = [K:String, Iterable[V: Double]]

I understand that there is a MultivariateSummary, but as this is a trait, but I 
can't understand what I use in it's stead.

If you need any more code, please let me know.

Thanks All.

K



Re: NotSerializableException in spark 1.4.0

2015-07-15 Thread Chen Song
Thanks

Can you point me to the patch to fix the serialization stack? Maybe I can
pull it in and rerun my job.

Chen

On Wed, Jul 15, 2015 at 4:40 PM, Tathagata Das  wrote:

> Your streaming job may have been seemingly running ok, but the DStream
> checkpointing must have been failing in the background. It would have been
> visible in the log4j logs. In 1.4.0, we enabled fast-failure for that so
> that checkpointing failures dont get hidden in the background.
>
> The fact that the serialization stack is not being shown correctly, is a
> known bug in Spark 1.4.0, but is fixed in 1.4.1 about to come out in the
> next couple of days. That should help you to narrow down the culprit
> preventing serialization.
>
> On Wed, Jul 15, 2015 at 1:12 PM, Ted Yu  wrote:
>
>> Can you show us your function(s) ?
>>
>> Thanks
>>
>> On Wed, Jul 15, 2015 at 12:46 PM, Chen Song 
>> wrote:
>>
>>> The streaming job has been running ok in 1.2 and 1.3. After I upgraded
>>> to 1.4, I started seeing error as below. It appears that it fails in
>>> validate method in StreamingContext. Is there anything changed on 1.4.0
>>> w.r.t DStream checkpointint?
>>>
>>> Detailed error from driver:
>>>
>>> 15/07/15 18:00:39 ERROR yarn.ApplicationMaster: User class threw
>>> exception: *java.io.NotSerializableException: DStream checkpointing has
>>> been enabled but the DStreams with their functions are not serializable*
>>> Serialization stack:
>>>
>>> java.io.NotSerializableException: DStream checkpointing has been enabled
>>> but the DStreams with their functions are not serializable
>>> Serialization stack:
>>>
>>> at
>>> org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:550)
>>> at
>>> org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:587)
>>> at
>>> org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586)
>>>
>>> --
>>> Chen Song
>>>
>>>
>>
>


-- 
Chen Song


Re: Unable to use dynamicAllocation if spark.executor.instances is set in spark-defaults.conf

2015-07-15 Thread Andrew Or
Yeah, we could make it a log a warning instead.

2015-07-15 14:29 GMT-07:00 Kelly, Jonathan :

>  Thanks! Is there an existing JIRA I should watch?
>
>
>  ~ Jonathan
>
>   From: Sandy Ryza 
> Date: Wednesday, July 15, 2015 at 2:27 PM
> To: Jonathan Kelly 
> Cc: "user@spark.apache.org" 
> Subject: Re: Unable to use dynamicAllocation if spark.executor.instances
> is set in spark-defaults.conf
>
>   Hi Jonathan,
>
>  This is a problem that has come up for us as well, because we'd like
> dynamic allocation to be turned on by default in some setups, but not break
> existing users with these properties.  I'm hoping to figure out a way to
> reconcile these by Spark 1.5.
>
>  -Sandy
>
> On Wed, Jul 15, 2015 at 3:18 PM, Kelly, Jonathan 
> wrote:
>
>>   Would there be any problem in having spark.executor.instances (or
>> --num-executors) be completely ignored (i.e., even for non-zero values) if
>> spark.dynamicAllocation.enabled is true (i.e., rather than throwing an
>> exception)?
>>
>>  I can see how the exception would be helpful if, say, you tried to pass
>> both "-c spark.executor.instances" (or --num-executors) *and* "-c
>> spark.dynamicAllocation.enabled=true" to spark-submit on the command line
>> (as opposed to having one of them in spark-defaults.conf and one of them in
>> the spark-submit args), but currently there doesn't seem to be any way to
>> distinguish between arguments that were actually passed to spark-submit and
>> settings that simply came from spark-defaults.conf.
>>
>>  If there were a way to distinguish them, I think the ideal situation
>> would be for the validation exception to be thrown only if
>> spark.executor.instances and spark.dynamicAllocation.enabled=true were both
>> passed via spark-submit args or were both present in spark-defaults.conf,
>> but passing spark.dynamicAllocation.enabled=true to spark-submit would take
>> precedence over spark.executor.instances configured in spark-defaults.conf,
>> and vice versa.
>>
>>
>>  Jonathan Kelly
>>
>> Elastic MapReduce - SDE
>>
>> Blackfoot (SEA33) 06.850.F0
>>
>>   From: Jonathan Kelly 
>> Date: Tuesday, July 14, 2015 at 4:23 PM
>> To: "user@spark.apache.org" 
>> Subject: Unable to use dynamicAllocation if spark.executor.instances is
>> set in spark-defaults.conf
>>
>>I've set up my cluster with a pre-calcualted value for
>> spark.executor.instances in spark-defaults.conf such that I can run a job
>> and have it maximize the utilization of the cluster resources by default.
>> However, if I want to run a job with dynamicAllocation (by passing -c
>> spark.dynamicAllocation.enabled=true to spark-submit), I get this exception:
>>
>>  Exception in thread "main" java.lang.IllegalArgumentException:
>> Explicitly setting the number of executors is not compatible with
>> spark.dynamicAllocation.enabled!
>> at
>> org.apache.spark.deploy.yarn.ClientArguments.parseArgs(ClientArguments.scala:192)
>> at
>> org.apache.spark.deploy.yarn.ClientArguments.(ClientArguments.scala:59)
>> at
>> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:54)
>>  …
>>
>>  The exception makes sense, of course, but ideally I would like it to
>> ignore what I've put in spark-defaults.conf for spark.executor.instances if
>> I've enabled dynamicAllocation. The most annoying thing about this is that
>> if I have spark.executor.instances present in spark-defaults.conf, I cannot
>> figure out any way to spark-submit a job with
>> spark.dynamicAllocation.enabled=true without getting this error. That is,
>> even if I pass "-c spark.executor.instances=0 -c
>> spark.dynamicAllocation.enabled=true", I still get this error because the
>> validation in ClientArguments.parseArgs() that's checking for this
>> condition simply checks for the presence of spark.executor.instances rather
>> than whether or not its value is > 0.
>>
>>  Should the check be changed to allow spark.executor.instances to be set
>> to 0 if spark.dynamicAllocation.enabled is true? That would be an OK
>> compromise, but I'd really prefer to be able to enable dynamicAllocation
>> simply by setting spark.dynamicAllocation.enabled=true rather than by also
>> having to set spark.executor.instances to 0.
>>
>>
>>  Thanks,
>>
>> Jonathan
>>
>
>


Re: NotSerializableException in spark 1.4.0

2015-07-15 Thread Ted Yu
Should be this one:
[SPARK-7180] [SPARK-8090] [SPARK-8091] Fix a number of
SerializationDebugger bugs and limitations
...
Closes #6625 from tdas/SPARK-7180 and squashes the following commits:

On Wed, Jul 15, 2015 at 2:37 PM, Chen Song  wrote:

> Thanks
>
> Can you point me to the patch to fix the serialization stack? Maybe I can
> pull it in and rerun my job.
>
> Chen
>
> On Wed, Jul 15, 2015 at 4:40 PM, Tathagata Das 
> wrote:
>
>> Your streaming job may have been seemingly running ok, but the DStream
>> checkpointing must have been failing in the background. It would have been
>> visible in the log4j logs. In 1.4.0, we enabled fast-failure for that so
>> that checkpointing failures dont get hidden in the background.
>>
>> The fact that the serialization stack is not being shown correctly, is a
>> known bug in Spark 1.4.0, but is fixed in 1.4.1 about to come out in the
>> next couple of days. That should help you to narrow down the culprit
>> preventing serialization.
>>
>> On Wed, Jul 15, 2015 at 1:12 PM, Ted Yu  wrote:
>>
>>> Can you show us your function(s) ?
>>>
>>> Thanks
>>>
>>> On Wed, Jul 15, 2015 at 12:46 PM, Chen Song 
>>> wrote:
>>>
 The streaming job has been running ok in 1.2 and 1.3. After I upgraded
 to 1.4, I started seeing error as below. It appears that it fails in
 validate method in StreamingContext. Is there anything changed on 1.4.0
 w.r.t DStream checkpointint?

 Detailed error from driver:

 15/07/15 18:00:39 ERROR yarn.ApplicationMaster: User class threw
 exception: *java.io.NotSerializableException: DStream checkpointing
 has been enabled but the DStreams with their functions are not 
 serializable*
 Serialization stack:

 java.io.NotSerializableException: DStream checkpointing has been
 enabled but the DStreams with their functions are not serializable
 Serialization stack:

 at
 org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:550)
 at
 org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:587)
 at
 org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586)

 --
 Chen Song


>>>
>>
>
>
> --
> Chen Song
>
>


Announcing Spark 1.4.1!

2015-07-15 Thread Patrick Wendell
Hi All,

I'm happy to announce the Spark 1.4.1 maintenance release.
We recommend all users on the 1.4 branch upgrade to
this release, which contain several important bug fixes.

Download Spark 1.4.1 - http://spark.apache.org/downloads.html
Release notes - http://spark.apache.org/releases/spark-release-1-4-1.html
Comprehensive list of fixes - http://s.apache.org/spark-1.4.1

Thanks to the 85 developers who worked on this release!

Please contact me directly for errata in the release notes.

- Patrick

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: NotSerializableException in spark 1.4.0

2015-07-15 Thread Tathagata Das
Spark 1.4.1 just got released! So just download that. Yay for timing.

On Wed, Jul 15, 2015 at 2:47 PM, Ted Yu  wrote:

> Should be this one:
> [SPARK-7180] [SPARK-8090] [SPARK-8091] Fix a number of
> SerializationDebugger bugs and limitations
> ...
> Closes #6625 from tdas/SPARK-7180 and squashes the following commits:
>
> On Wed, Jul 15, 2015 at 2:37 PM, Chen Song  wrote:
>
>> Thanks
>>
>> Can you point me to the patch to fix the serialization stack? Maybe I can
>> pull it in and rerun my job.
>>
>> Chen
>>
>> On Wed, Jul 15, 2015 at 4:40 PM, Tathagata Das 
>> wrote:
>>
>>> Your streaming job may have been seemingly running ok, but the DStream
>>> checkpointing must have been failing in the background. It would have been
>>> visible in the log4j logs. In 1.4.0, we enabled fast-failure for that so
>>> that checkpointing failures dont get hidden in the background.
>>>
>>> The fact that the serialization stack is not being shown correctly, is a
>>> known bug in Spark 1.4.0, but is fixed in 1.4.1 about to come out in the
>>> next couple of days. That should help you to narrow down the culprit
>>> preventing serialization.
>>>
>>> On Wed, Jul 15, 2015 at 1:12 PM, Ted Yu  wrote:
>>>
 Can you show us your function(s) ?

 Thanks

 On Wed, Jul 15, 2015 at 12:46 PM, Chen Song 
 wrote:

> The streaming job has been running ok in 1.2 and 1.3. After I upgraded
> to 1.4, I started seeing error as below. It appears that it fails in
> validate method in StreamingContext. Is there anything changed on 1.4.0
> w.r.t DStream checkpointint?
>
> Detailed error from driver:
>
> 15/07/15 18:00:39 ERROR yarn.ApplicationMaster: User class threw
> exception: *java.io.NotSerializableException: DStream checkpointing
> has been enabled but the DStreams with their functions are not 
> serializable*
> Serialization stack:
>
> java.io.NotSerializableException: DStream checkpointing has been
> enabled but the DStreams with their functions are not serializable
> Serialization stack:
>
> at
> org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:550)
> at
> org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:587)
> at
> org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586)
>
> --
> Chen Song
>
>

>>>
>>
>>
>> --
>> Chen Song
>>
>>
>


get java.io.FileNotFoundException when use addFile Function

2015-07-15 Thread prateek arora
I am trying to write a simple program using addFile Function but getting
error in my worker node that file doest not exist

tage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost
task 0.3 in stage 0.0 (TID 3, slave2.novalocal):
java.io.FileNotFoundException: File
file:/tmp/spark-2791415c-8c20-4920-b3cd-5a6b8b6f3f8d/userFiles-a5a98f06-2d38-48
76-8c8c-82a10ac5431f/csv_ip.csv does not exist

code are as below :

val sc = new SparkContext(sparkConf)
val inputFile ="file:///home/ubuntu/test/Spark_CSV/spark_csv_job/csv_ip.csv"
sc.addFile(inputFile)
val inFile = sc.textFile("file://"+SparkFiles.get("csv_ip.csv"))
inFile.take(10).foreach(println)

please help me resolve error. Thanks in advance.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/get-java-io-FileNotFoundException-when-use-addFile-Function-tp23867.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



get java.io.FileNotFoundException when use addFile Function

2015-07-15 Thread prateek arora
Hi

I am trying to write a simple program using addFile Function but getting
error in my worker node that file doest not exist

tage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost
task 0.3 in stage 0.0 (TID 3, slave2.novalocal):
java.io.FileNotFoundException: File
file:/tmp/spark-2791415c-8c20-4920-b3cd-5a6b8b6f3f8d/userFiles-a5a98f06-2d38-48
76-8c8c-82a10ac5431f/csv_ip.csv does not exist

code are as below :

val sc = new SparkContext(sparkConf)
val inputFile
="file:///home/ubuntu/test/Spark_CSV/spark_csv_job/csv_ip.csv"
sc.addFile(inputFile)
val inFile = sc.textFile("file://"+SparkFiles.get("csv_ip.csv"))
inFile.take(10).foreach(println)

please help me resolve error. Thanks in advance.


Regards
prateek


Python DataFrames, length of array

2015-07-15 Thread pedro
Based on the list of functions here:
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions

there doesn't seem to be a way to get the length of an array in a dataframe
without defining a UDF.

What I would be looking for is something like this (except length_udf would
be pyspark.sql.functions.length or something similar):

length_udf = UserDefinedFunction(len, IntegerType())
test_schema = StructType([
StructField('arr', ArrayType(IntegerType())),
StructField('letter', StringType())
])
test_df = sql.createDataFrame(sc.parallelize([
[[1, 2, 3], 'a'],
[[4, 5, 6, 7, 8], 'b']
]), test_schema)
test_df.select(length_udf(test_df.arr)).collect()

Output:
[Row(PythonUDF#len(arr)=3), Row(PythonUDF#len(arr)=5)]

Is there currently a way to accomplish this? If this doesn't exist and seems
useful, I would be happy to contribute a PR with the function.

Pedro Rodriguez



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Python-DataFrames-length-of-array-tp23868.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Python DataFrames: length of ArrayType

2015-07-15 Thread pedro
Resubmitting after fixing subscription to mailing list.

Based on the list of functions here:
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions

there doesn't seem to be a way to get the length of an array in a dataframe
without defining a UDF. 

What I would be looking for is something like this (except length_udf would
be pyspark.sql.functions.length or something similar):

length_udf = UserDefinedFunction(len, IntegerType()) 
test_schema = StructType([ 
StructField('arr', ArrayType(IntegerType())), 
StructField('letter', StringType()) 
]) 
test_df = sql.createDataFrame(sc.parallelize([ 
[[1, 2, 3], 'a'], 
[[4, 5, 6, 7, 8], 'b'] 
]), test_schema) 
test_df.select(length_udf(test_df.arr)).collect() 

Output: 
[Row(PythonUDF#len(arr)=3), Row(PythonUDF#len(arr)=5)] 

Is there currently a way to accomplish this? If this doesn't exist and seems
useful, I would be happy to contribute a PR with the function. 

Pedro Rodriguez



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Python-DataFrames-length-of-ArrayType-tp23869.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: NotSerializableException in spark 1.4.0

2015-07-15 Thread Chen Song
Ah, cool. Thanks.

On Wed, Jul 15, 2015 at 5:58 PM, Tathagata Das  wrote:

> Spark 1.4.1 just got released! So just download that. Yay for timing.
>
> On Wed, Jul 15, 2015 at 2:47 PM, Ted Yu  wrote:
>
>> Should be this one:
>> [SPARK-7180] [SPARK-8090] [SPARK-8091] Fix a number of
>> SerializationDebugger bugs and limitations
>> ...
>> Closes #6625 from tdas/SPARK-7180 and squashes the following commits:
>>
>> On Wed, Jul 15, 2015 at 2:37 PM, Chen Song 
>> wrote:
>>
>>> Thanks
>>>
>>> Can you point me to the patch to fix the serialization stack? Maybe I
>>> can pull it in and rerun my job.
>>>
>>> Chen
>>>
>>> On Wed, Jul 15, 2015 at 4:40 PM, Tathagata Das 
>>> wrote:
>>>
 Your streaming job may have been seemingly running ok, but the DStream
 checkpointing must have been failing in the background. It would have been
 visible in the log4j logs. In 1.4.0, we enabled fast-failure for that so
 that checkpointing failures dont get hidden in the background.

 The fact that the serialization stack is not being shown correctly, is
 a known bug in Spark 1.4.0, but is fixed in 1.4.1 about to come out in the
 next couple of days. That should help you to narrow down the culprit
 preventing serialization.

 On Wed, Jul 15, 2015 at 1:12 PM, Ted Yu  wrote:

> Can you show us your function(s) ?
>
> Thanks
>
> On Wed, Jul 15, 2015 at 12:46 PM, Chen Song 
> wrote:
>
>> The streaming job has been running ok in 1.2 and 1.3. After I
>> upgraded to 1.4, I started seeing error as below. It appears that it 
>> fails
>> in validate method in StreamingContext. Is there anything changed on 
>> 1.4.0
>> w.r.t DStream checkpointint?
>>
>> Detailed error from driver:
>>
>> 15/07/15 18:00:39 ERROR yarn.ApplicationMaster: User class threw
>> exception: *java.io.NotSerializableException: DStream checkpointing
>> has been enabled but the DStreams with their functions are not 
>> serializable*
>> Serialization stack:
>>
>> java.io.NotSerializableException: DStream checkpointing has been
>> enabled but the DStreams with their functions are not serializable
>> Serialization stack:
>>
>> at
>> org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:550)
>> at
>> org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:587)
>> at
>> org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586)
>>
>> --
>> Chen Song
>>
>>
>

>>>
>>>
>>> --
>>> Chen Song
>>>
>>>
>>
>


-- 
Chen Song


Spark cluster read local files

2015-07-15 Thread Julien Beaudan

Hi all,

Is it possible to use Spark to assign each machine in a cluster the same 
task, but on files in each machine's local file system, and then have 
the results sent back to the driver program?


Thank you in advance!

Julien



smime.p7s
Description: S/MIME Cryptographic Signature


Re: Java 8 vs Scala

2015-07-15 Thread Alan Burlison

On 15/07/2015 21:17, Ted Yu wrote:


jshell is nice but it is targeting Java 9


Yes I know, just pointing out that eventually Java would have a REPL as 
well.


--
Alan Burlison
--

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark and HDFS

2015-07-15 Thread Marcelo Vanzin
On Wed, Jul 15, 2015 at 5:36 AM, Jeskanen, Elina 
wrote:

>  I have Spark 1.4 on my local machine and I would like to connect to our
> local 4 nodes Cloudera cluster. But how?
>
>
>
> In the example it says text_file = spark.textFile("hdfs://..."), but can
> you advise me in where to get this "hdfs://..." -address?
>

In 99% of the cases, you shouldn't need it. Just set "HADOOP_CONF_DIR" to a
directory containing your HDFS configuration, and just pass the path to the
files to process (without the "hdfs:..." prefix) to that API.

-- 
Marcelo


RE: Python DataFrames: length of ArrayType

2015-07-15 Thread Cheng, Hao
Actually it's supposed to be part of Spark 1.5 release, see 
https://issues.apache.org/jira/browse/SPARK-8230
You're definitely welcome to contribute to it, let me know if you have any 
question on implementing it.

Cheng Hao


-Original Message-
From: pedro [mailto:ski.rodrig...@gmail.com] 
Sent: Thursday, July 16, 2015 7:31 AM
To: user@spark.apache.org
Subject: Python DataFrames: length of ArrayType

Resubmitting after fixing subscription to mailing list.

Based on the list of functions here:
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions

there doesn't seem to be a way to get the length of an array in a dataframe 
without defining a UDF. 

What I would be looking for is something like this (except length_udf would be 
pyspark.sql.functions.length or something similar):

length_udf = UserDefinedFunction(len, IntegerType()) test_schema = StructType([ 
StructField('arr', ArrayType(IntegerType())), 
StructField('letter', StringType()) 
])
test_df = sql.createDataFrame(sc.parallelize([ 
[[1, 2, 3], 'a'], 
[[4, 5, 6, 7, 8], 'b'] 
]), test_schema)
test_df.select(length_udf(test_df.arr)).collect() 

Output: 
[Row(PythonUDF#len(arr)=3), Row(PythonUDF#len(arr)=5)] 

Is there currently a way to accomplish this? If this doesn't exist and seems 
useful, I would be happy to contribute a PR with the function. 

Pedro Rodriguez



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Python-DataFrames-length-of-ArrayType-tp23869.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: HiveThriftServer2.startWithContext error with registerTempTable

2015-07-15 Thread Srikanth
Hello,

Re-sending this to see if I'm second time lucky!
I've not managed to move past this error.

Srikanth

On Mon, Jul 13, 2015 at 9:14 PM, Srikanth  wrote:

> Hello,
>
> I want to expose result of Spark computation to external tools. I plan to
> do this with Thrift server JDBC interface by registering result Dataframe
> as temp table.
> I wrote a sample program in spark-shell to test this.
>
> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>> import hiveContext.implicits._
>> HiveThriftServer2.startWithContext(hiveContext)
>> val myDF =
>> hiveContext.read.format("com.databricks.spark.csv").option("header",
>> "true").load("/datafolder/weblog/pages.csv")
>> myDF.registerTempTable("temp_table")
>
>
> I'm able to see the temp table in Beeline
>
> +-+--+
>> |  tableName  | isTemporary  |
>> +-+--+
>> | temp_table  | true |
>> | my_table| false|
>> +-+--+
>
>
> Now when I issue "select * from temp_table" from Beeline, I see below
> exception in spark-shell
>
> 15/07/13 17:18:27 WARN ThriftCLIService: Error executing statement:
> org.apache.hive.service.cli.HiveSQLException: 
> *java.lang.ClassNotFoundException:
> com.databricks.spark.csv.CsvRelation$$anonfun$buildScan$1$$anonfun$1*
> at
> org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.run(Shim13.scala:206)
> at
> org.apache.hive.service.cli.session.HiveSessionImpl.executeStatementInternal(HiveSessionImpl.java:231)
> at
> org.apache.hive.service.cli.session.HiveSessionImpl.executeStatementAsync(HiveSessionImpl.java:218)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> I'm able to read the other table("my_table") from Beeline though.
> Any suggestions on how to overcome this?
>
> This is with Spark 1.4 pre-built version. Spark-shell was started with
> --package to pass spark-csv.
>
> Srikanth
>


Possible to combine all RDDs from a DStream batch into one?

2015-07-15 Thread Jon Chase
I'm currently doing something like this in my Spark Streaming program
(Java):

dStream.foreachRDD((rdd, batchTime) -> {
log.info("processing RDD from batch {}", batchTime);

// my rdd processing code

});

Instead of having my rdd processing code called once for each RDD in the
batch, is it possible to essentially group all of the RDDs from the batch
into a single RDD and single partition and therefore operate on all of the
elements in the batch at once?

My goal here is to do an operation exactly once for every batch.  As I
understand it, foreachRDD is going to do the operation once for each RDD in
the batch, which is not what I want.

I've looked at DStream.repartition(int), but the docs make it sound like it
only changes the number of partitions in the batch's existing RDDs, not the
number of RDDs.


Re: Possible to combine all RDDs from a DStream batch into one?

2015-07-15 Thread Jon Chase
I should note that the amount of data in each batch is very small, so I'm
not concerned with performance implications of grouping into a single RDD.

On Wed, Jul 15, 2015 at 9:58 PM, Jon Chase  wrote:

> I'm currently doing something like this in my Spark Streaming program
> (Java):
>
> dStream.foreachRDD((rdd, batchTime) -> {
> log.info("processing RDD from batch {}", batchTime);
> 
> // my rdd processing code
> 
> });
>
> Instead of having my rdd processing code called once for each RDD in the
> batch, is it possible to essentially group all of the RDDs from the batch
> into a single RDD and single partition and therefore operate on all of the
> elements in the batch at once?
>
> My goal here is to do an operation exactly once for every batch.  As I
> understand it, foreachRDD is going to do the operation once for each RDD in
> the batch, which is not what I want.
>
> I've looked at DStream.repartition(int), but the docs make it sound like
> it only changes the number of partitions in the batch's existing RDDs, not
> the number of RDDs.
>


  1   2   >