Re: Is the disk space in SPARK_LOCAL_DIRS cleanned up?

2015-04-13 Thread Marius Soutier
I have set SPARK_WORKER_OPTS in spark-env.sh for that. For example:

export SPARK_WORKER_OPTS="-Dspark.worker.cleanup.enabled=true 
-Dspark.worker.cleanup.appDataTtl="

> On 11.04.2015, at 00:01, Wang, Ningjun (LNG-NPV) 
>  wrote:
> 
> Does anybody have an answer for this?
>  
> Thanks
> Ningjun
>  
> From: Wang, Ningjun (LNG-NPV) 
> Sent: Thursday, April 02, 2015 12:14 PM
> To: user@spark.apache.org 
> Subject: Is the disk space in SPARK_LOCAL_DIRS cleanned up?
>  
> I set SPARK_LOCAL_DIRS   to   C:\temp\spark-temp. When RDDs are shuffled, 
> spark writes to this folder. I found that the disk space of this folder keep 
> on increase quickly and at certain point I will run out of disk space. 
>  
> I wonder does spark clean up the disk space in this folder once the shuffle 
> operation is done? If not, I need to write a job to clean it up myself. But 
> how do I know which sub folders there can be removed?
>  
> Ningjun



Re: regarding ZipWithIndex

2015-04-13 Thread Jeetendra Gangele
How about using mapToPair and exchanging the two. Will it be efficient
Below is the code , will it be efficient to convert like this.


JavaPairRDD RddForMarch
=matchRdd.zipWithindex.mapToPair(new
PairFunction, Long, MatcherReleventData>() {

@Override
public Tuple2 call(Tuple2 t)
throws Exception {
MatcherReleventData matcherData = new MatcherReleventData();
Tuple2 tuple = new Tuple2(t._2,
matcherData.convertVendorDataToMatcherData(t._1));
 return tuple;
}

}).cache();

On 13 April 2015 at 03:11, Ted Yu  wrote:

> Please also take a look at ZippedWithIndexRDDPartition which is 72 lines
> long.
>
> You can create your own version which extends RDD[(Long, T)]
>
> Cheers
>
> On Sun, Apr 12, 2015 at 1:29 PM, Ted Yu  wrote:
>
>> bq. will return something like JavaPairRDD
>>
>> The long component of the pair fits your description of index. What other
>> requirement does ZipWithIndex not provide you ?
>>
>> Cheers
>>
>> On Sun, Apr 12, 2015 at 1:16 PM, Jeetendra Gangele 
>> wrote:
>>
>>> Hi All I have an RDD JavaRDD and I want to convert it to
>>> JavaPairRDD.. Index should be unique and it should maintain
>>> the order. For first object It should have 1 and then for second 2 like
>>> that.
>>>
>>> I tried using ZipWithIndex but it will return something like
>>> JavaPairRDD
>>> I wanted to use this RDD for lookup and join operation later in my
>>> workflow so ordering is important.
>>>
>>>
>>> Regards
>>> jeet
>>>
>>
>>
>


Manning looking for a co-author for the GraphX in Action book

2015-04-13 Thread Reynold Xin
Hi all,

Manning (the publisher) is looking for a co-author for the GraphX in Action
book. The book currently has one author (Michael Malak), but they are
looking for a co-author to work closely with Michael and improve the
writings and make it more consumable.

Early access page for the book: http://www.manning.com/malak/

Let me know if you are interested in that. Cheers.


RE: Kryo exception : Encountered unregistered class ID: 13994

2015-04-13 Thread mehdisinger
Hello,

Thank you for your answer.

I'm already registering my classes as you're suggesting...

Regards

De : tsingfu [via Apache Spark User List] 
[mailto:ml-node+s1001560n22468...@n3.nabble.com]
Envoyé : lundi 13 avril 2015 03:48
À : Mehdi Singer
Objet : Re: Kryo exception : Encountered unregistered class ID: 13994

Hi,
error message is mentioned:
>com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 
>13994

So I think this is issue with kryo, You should use 
`kryo.register(classOf[your_class_name])` in your app code.


If you reply to this email, your message will be added to the discussion below:
http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-exception-Encountered-unregistered-class-ID-13994-tp22437p22468.html
To unsubscribe from Kryo exception : Encountered unregistered class ID: 13994, 
click 
here.
NAML




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-exception-Encountered-unregistered-class-ID-13994-tp22437p22471.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Kryo exception : Encountered unregistered class ID: 13994

2015-04-13 Thread ๏̯͡๏
You need to do few more things or you will eventually run into these issues

val conf = new SparkConf()
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
*  .set("spark.kryoserializer.buffer.mb",
arguments.get("buffersize").get)*
*  .set("spark.kryoserializer.buffer.max.mb",
arguments.get("maxbuffersize").get)*
.registerKryoClasses(Array(classOf[com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLevelMetricSum]))

-Deepak

On Mon, Apr 13, 2015 at 1:19 PM, mehdisinger 
wrote:

>  Hello,
>
>
>
> Thank you for your answer.
>
>
>
> I’m already registering my classes as you’re suggesting…
>
>
>
> Regards
>
>
>
> *De :* tsingfu [via Apache Spark User List] [mailto:ml-node+[hidden email]
> ]
> *Envoyé :* lundi 13 avril 2015 03:48
> *À :* Mehdi Singer
> *Objet :* Re: Kryo exception : Encountered unregistered class ID: 13994
>
>
>
> Hi,
> error message is mentioned:
> >com.esotericsoftware.kryo.KryoException: Encountered unregistered class
> ID: 13994
>
> So I think this is issue with kryo, You should use
> `kryo.register(classOf[your_class_name])` in your app code.
>
>  --
>
> *If you reply to this email, your message will be added to the discussion
> below:*
>
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-exception-Encountered-unregistered-class-ID-13994-tp22437p22468.html
>
> To unsubscribe from Kryo exception : Encountered unregistered class ID:
> 13994, click here.
> NAML
> 
>
> --
> View this message in context: RE: Kryo exception : Encountered
> unregistered class ID: 13994
> 
>
> Sent from the Apache Spark User List mailing list archive
>  at Nabble.com.
>



-- 
Deepak


Spark Cluster: RECEIVED SIGNAL 15: SIGTERM

2015-04-13 Thread James King
Any idea what this means, many thanks

==>
logs/spark-.-org.apache.spark.deploy.worker.Worker-1-09.out.1
<==
15/04/13 07:07:22 INFO Worker: Starting Spark worker 09:39910 with 4
cores, 6.6 GB RAM
15/04/13 07:07:22 INFO Worker: Running Spark version 1.3.0
15/04/13 07:07:22 INFO Worker: Spark home:
/remote/users//work/tools/spark-1.3.0-bin-hadoop2.4
15/04/13 07:07:22 INFO Server: jetty-8.y.z-SNAPSHOT
15/04/13 07:07:22 INFO AbstractConnector: Started
SelectChannelConnector@0.0.0.0:8081
15/04/13 07:07:22 INFO Utils: Successfully started service 'WorkerUI' on
port 8081.
15/04/13 07:07:22 INFO WorkerWebUI: Started WorkerWebUI at
http://09:8081
15/04/13 07:07:22 INFO Worker: Connecting to master
akka.tcp://sparkMaster@nceuhamnr08:7077/user/Master...
15/04/13 07:07:22 INFO Worker: Successfully registered with master
spark://08:7077
*15/04/13 08:35:07 ERROR Worker: RECEIVED SIGNAL 15: SIGTERM*


Re: Is the disk space in SPARK_LOCAL_DIRS cleanned up?

2015-04-13 Thread Guillaume Pitel
Does it also cleanup spark local dirs ? I thought it was only cleaning 
$SPARK_HOME/work/


Guillaume

I have set SPARK_WORKER_OPTS in spark-env.sh for that. For example:

export SPARK_WORKER_OPTS="-Dspark.worker.cleanup.enabled=true 
-Dspark.worker.cleanup.appDataTtl="


On 11.04.2015, at 00:01, Wang, Ningjun (LNG-NPV) 
mailto:ningjun.w...@lexisnexis.com>> wrote:


Does anybody have an answer for this?
Thanks
Ningjun
*From:*Wang, Ningjun (LNG-NPV)
*Sent:*Thursday, April 02, 2015 12:14 PM
*To:*user@spark.apache.org 
*Subject:*Is the disk space in SPARK_LOCAL_DIRS cleanned up?
I set SPARK_LOCAL_DIRS   to   C:\temp\spark-temp. When RDDs are 
shuffled, spark writes to this folder. I found that the disk space of 
this folder keep on increase quickly and at certain point I will run 
out of disk space.
I wonder does spark clean up the disk spacein this folder once the 
shuffle operation is done? If not, I need to write a job to clean it 
up myself. But how do I know which sub folders there can be removed?

Ningjun





--
eXenSa


*Guillaume PITEL, Président*
+33(0)626 222 431

eXenSa S.A.S. 
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)184 163 677 / Fax +33(0)972 283 705



Re: Spark Cluster: RECEIVED SIGNAL 15: SIGTERM

2015-04-13 Thread Guillaume Pitel

Very likely to be this :
http://www.linuxdevcenter.com/pub/a/linux/2006/11/30/linux-out-of-memory.html?page=2

Your worker ran out of memory => maybe you're asking for too much memory 
for the JVM, or something else is running on the worker


Guillaume

Any idea what this means, many thanks

==> 
logs/spark-.-org.apache.spark.deploy.worker.Worker-1-09.out.1 
<==
15/04/13 07:07:22 INFO Worker: Starting Spark worker 09:39910 with 
4 cores, 6.6 GB RAM

15/04/13 07:07:22 INFO Worker: Running Spark version 1.3.0
15/04/13 07:07:22 INFO Worker: Spark home: 
/remote/users//work/tools/spark-1.3.0-bin-hadoop2.4

15/04/13 07:07:22 INFO Server: jetty-8.y.z-SNAPSHOT
15/04/13 07:07:22 INFO AbstractConnector: Started 
SelectChannelConnector@0.0.0.0:8081 

15/04/13 07:07:22 INFO Utils: Successfully started service 'WorkerUI' 
on port 8081.
15/04/13 07:07:22 INFO WorkerWebUI: Started WorkerWebUI at 
http://09:8081
15/04/13 07:07:22 INFO Worker: Connecting to master 
akka.tcp://sparkMaster@nceuhamnr08:7077/user/Master...
15/04/13 07:07:22 INFO Worker: Successfully registered with master 
spark://08:7077

*15/04/13 08:35:07 ERROR Worker: RECEIVED SIGNAL 15: SIGTERM*




--
eXenSa


*Guillaume PITEL, Président*
+33(0)626 222 431

eXenSa S.A.S. 
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)184 163 677 / Fax +33(0)972 283 705



Re: Spark 1.3.0: Running Pi example on YARN fails

2015-04-13 Thread Zork Sail
Hi Zhan,
Alas setting:

-Dhdp.version=2.2.0.0–2041

Does not help. Still get the same error:
15/04/13 09:53:59 INFO yarn.Client:
 client token: N/A
 diagnostics: N/A
 ApplicationMaster host: N/A
 ApplicationMaster RPC port: -1
 queue: default
 start time: 1428918838408
 final status: UNDEFINED
 tracking URL:
http://foo.bar.site:8088/proxy/application_1427875242006_0037/
 user: test
15/04/13 09:54:00 INFO yarn.Client: Application report for
application_1427875242006_0037 (state: ACCEPTED)
15/04/13 09:54:01 INFO yarn.Client: Application report for
application_1427875242006_0037 (state: ACCEPTED)
15/04/13 09:54:02 INFO yarn.Client: Application report for
application_1427875242006_0037 (state: ACCEPTED)
15/04/13 09:54:03 INFO yarn.Client: Application report for
application_1427875242006_0037 (state: FAILED)
15/04/13 09:54:03 INFO yarn.Client:
 client token: N/A
 diagnostics: Application application_1427875242006_0037 failed 2 times
due to AM Container for appattempt_1427875242006_0037_02 exited with
exitCode: 1
For more detailed output, check application tracking page:
http://foo.bar.site:8088/proxy/application_1427875242006_0037/Then, click
on links to logs of each attempt.
Diagnostics: Exception from container-launch.
Container id: container_1427875242006_0037_02_01
Exit code: 1
Exception message:
/mnt/hdfs01/hadoop/yarn/local/usercache/test/appcache/application_1427875242006_0037/container_1427875242006_0037_02_01/launch_container.sh:
line 27:
$PWD:$PWD/__spark__.jar:$HADOOP_CONF_DIR:/usr/hdp/current/hadoop-client/*:/usr/hdp/current/hadoop-client/lib/*:/usr/hdp/current/hadoop-hdfs-client/*:/usr/hdp/current/hadoop-hdfs-client/lib/*:/usr/hdp/current/hadoop-yarn-client/*:/usr/hdp/current/hadoop-yarn-client/lib/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/lib/*:$PWD/mr-framework/hadoop/share/hadoop/common/*:$PWD/mr-framework/hadoop/share/hadoop/common/lib/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/lib/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/lib/*:/usr/hdp/${hdp.version}/hadoop/lib/hadoop-lzo-0.6.0.${hdp.version}.jar:/etc/hadoop/conf/secure:
bad substitution

Stack trace: ExitCodeException exitCode=1:
/mnt/hdfs01/hadoop/yarn/local/usercache/test/appcache/application_1427875242006_0037/container_1427875242006_0037_02_01/launch_container.sh:
line 27:
$PWD:$PWD/__spark__.jar:$HADOOP_CONF_DIR:/usr/hdp/current/hadoop-client/*:/usr/hdp/current/hadoop-client/lib/*:/usr/hdp/current/hadoop-hdfs-client/*:/usr/hdp/current/hadoop-hdfs-client/lib/*:/usr/hdp/current/hadoop-yarn-client/*:/usr/hdp/current/hadoop-yarn-client/lib/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/lib/*:$PWD/mr-framework/hadoop/share/hadoop/common/*:$PWD/mr-framework/hadoop/share/hadoop/common/lib/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/lib/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/lib/*:/usr/hdp/${hdp.version}/hadoop/lib/hadoop-lzo-0.6.0.${hdp.version}.jar:/etc/hadoop/conf/secure:
bad substitution

at org.apache.hadoop.util.Shell.runCommand(Shell.java:538)
at org.apache.hadoop.util.Shell.run(Shell.java:455)
at
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)
at
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:211)
at
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
at
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)


Container exited with a non-zero exit code 1
Failing this attempt. Failing the application.
 ApplicationMaster host: N/A
 ApplicationMaster RPC port: -1
 queue: default
 start time: 1428918838408
 final status: FAILED
 tracking URL:
http://foo.bar.site:8088/cluster/app/application_1427875242006_0037
 user: test
Exception in thread "main" org.apache.spark.SparkException: Application
finished with failed status
at org.apache.spark.deploy.yarn.Client.run(Client.scala:622)
at org.apache.spark.deploy.yarn.Client$.main(Client.scala:647)
at org.apache.spark.deploy.yarn.Client.main(Client.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:

Re: Problem getting program to run on 15TB input

2015-04-13 Thread Daniel Mahler
Sometimes a large number of partitions leads to memory problems.
Something like

val rdd1 = sc.textFile(file1).coalesce(500). ...
val rdd2 = sc.textFile(file2).coalesce(500). ...

may help.


On Mon, Mar 2, 2015 at 6:26 PM, Arun Luthra  wrote:

> Everything works smoothly if I do the 99%-removal filter in Hive first.
> So, all the baggage from garbage collection was breaking it.
>
> Is there a way to filter() out 99% of the data without having to garbage
> collect 99% of the RDD?
>
> On Sun, Mar 1, 2015 at 9:56 AM, Arun Luthra  wrote:
>
>> I tried a shorter simper version of the program, with just 1 RDD,
>>  essentially it is:
>>
>> sc.textFile(..., N).map().filter().map( blah => (id,
>> 1L)).reduceByKey().saveAsTextFile(...)
>>
>> Here is a typical GC log trace from one of the yarn container logs:
>>
>> 54.040: [GC [PSYoungGen: 9176064K->28206K(10704896K)]
>> 9176064K->28278K(35171840K), 0.0234420 secs] [Times: user=0.15 sys=0.01,
>> real=0.02 secs]
>> 77.864: [GC [PSYoungGen: 9204270K->150553K(10704896K)]
>> 9204342K->150641K(35171840K), 0.0423020 secs] [Times: user=0.30 sys=0.26,
>> real=0.04 secs]
>> 79.485: [GC [PSYoungGen: 9326617K->333519K(10704896K)]
>> 9326705K->333615K(35171840K), 0.0774990 secs] [Times: user=0.35 sys=1.28,
>> real=0.08 secs]
>> 92.974: [GC [PSYoungGen: 9509583K->193370K(10704896K)]
>> 9509679K->193474K(35171840K), 0.0241590 secs] [Times: user=0.35 sys=0.11,
>> real=0.02 secs]
>> 114.842: [GC [PSYoungGen: 9369434K->123577K(10704896K)]
>> 9369538K->123689K(35171840K), 0.0201000 secs] [Times: user=0.31 sys=0.00,
>> real=0.02 secs]
>> 117.277: [GC [PSYoungGen: 9299641K->135459K(11918336K)]
>> 9299753K->135579K(36385280K), 0.0244820 secs] [Times: user=0.19 sys=0.25,
>> real=0.02 secs]
>>
>> So ~9GB is getting GC'ed every few seconds. Which seems like a lot.
>>
>> Question: The filter() is removing 99% of the data. Does this 99% of the
>> data get GC'ed?
>>
>> Now, I was able to finally get to reduceByKey() by reducing the number of
>> executor-cores (to 2), based on suggestions at
>> http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-OutOfMemoryError-java-lang-OutOfMemoryError-GC-overhead-limit-exceeded-td9036.html
>> . This makes everything before reduceByKey() run pretty smoothly.
>>
>> I ran this with more executor-memory and less executors (most important
>> thing was fewer executor-cores):
>>
>> --num-executors 150 \
>> --driver-memory 15g \
>> --executor-memory 110g \
>> --executor-cores 32 \
>>
>> But then, reduceByKey() fails with:
>>
>> java.lang.OutOfMemoryError: Java heap space
>>
>>
>>
>>
>> On Sat, Feb 28, 2015 at 12:09 PM, Arun Luthra 
>> wrote:
>>
>>> The Spark UI names the line number and name of the operation
>>> (repartition in this case) that it is performing. Only if this information
>>> is wrong (just a possibility), could it have started groupByKey already.
>>>
>>> I will try to analyze the amount of skew in the data by using
>>> reduceByKey (or simply countByKey) which is relatively inexpensive. For the
>>> purposes of this algorithm I can simply log and remove keys with huge
>>> counts, before doing groupByKey.
>>>
>>> On Sat, Feb 28, 2015 at 11:38 AM, Aaron Davidson 
>>> wrote:
>>>
 All stated symptoms are consistent with GC pressure (other nodes
 timeout trying to connect because of a long stop-the-world), quite possibly
 due to groupByKey. groupByKey is a very expensive operation as it may bring
 all the data for a particular partition into memory (in particular, it
 cannot spill values for a single key, so if you have a single very skewed
 key you can get behavior like this).

 On Sat, Feb 28, 2015 at 11:33 AM, Paweł Szulc 
 wrote:

> But groupbykey will repartition according to numer of keys as I
> understand how it works. How do you know that you haven't reached the
> groupbykey phase? Are you using a profiler or do yoi base that assumption
> only on logs?
>
> sob., 28 lut 2015, 8:12 PM Arun Luthra użytkownik <
> arun.lut...@gmail.com> napisał:
>
> A correction to my first post:
>>
>> There is also a repartition right before groupByKey to help avoid
>> too-many-open-files error:
>>
>>
>> rdd2.union(rdd1).map(...).filter(...).repartition(15000).groupByKey().map(...).flatMap(...).saveAsTextFile()
>>
>> On Sat, Feb 28, 2015 at 11:10 AM, Arun Luthra 
>> wrote:
>>
>>> The job fails before getting to groupByKey.
>>>
>>> I see a lot of timeout errors in the yarn logs, like:
>>>
>>> 15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1
>>> attempts
>>> akka.pattern.AskTimeoutException: Timed out
>>>
>>> and
>>>
>>> 15/02/28 12:47:49 WARN util.AkkaUtils: Error sending message in 2
>>> attempts
>>> java.util.concurrent.TimeoutException: Futures timed out after [30
>>> seconds]
>>>
>>> and some of these are followed by:
>>>
>>> 15/02/28 12:48:02 ERROR execut

Exception"Driver-Memory" while running Spark job on Yarn-cluster

2015-04-13 Thread sachin Singh
Hi ,
When I am submitting spark job as --master yarn-cluster with below
command/options getting driver 
memory error-

spark-submit --jars
./libs/mysql-connector-java-5.1.17.jar,./libs/log4j-1.2.17.jar --files
datasource.properties,log4j.properties --master yarn-cluster --num-executors
1 --driver-memory 2g --executor-memory 512m --class
com.test.spark.jobs.AggregationJob sparkagg.jar 

Exceptions as per yarn application ID log as under -
Container: container_1428938273236_0006_01_01 on 
mycom.hostname.com_8041
=
LogType: stderr
LogLength: 128
Log Contents:
Exception in thread "Driver"
Exception: java.lang.OutOfMemoryError thrown from the
UncaughtExceptionHandler in thread "Driver"

LogType: stdout
LogLength: 40


Container: container_1428938273236_0006_02_01 on mycom.hostname.com_8041
=
LogType: stderr
LogLength: 1365
Log Contents:
java.io.IOException: Log directory
hdfs://mycom.hostname.com:8020/user/spark/applicationHistory/application_1428938273236_0006
already exists!
at
org.apache.spark.util.FileLogger.createLogDir(FileLogger.scala:129)
at org.apache.spark.util.FileLogger.start(FileLogger.scala:115)
at
org.apache.spark.scheduler.EventLoggingListener.start(EventLoggingListener.scala:74)
at org.apache.spark.SparkContext.(SparkContext.scala:353)
at
org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:61)
  
LogType: stdout
LogLength: 40


please help its urgent for me,




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Exception-Driver-Memory-while-running-Spark-job-on-Yarn-cluster-tp22475.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Parquet File Binary column statistics error when reuse byte[] among rows

2015-04-13 Thread Cheng Lian

Thanks Yijie! Also cc the user list.

Cheng

On 4/13/15 9:19 AM, Yijie Shen wrote:
I opened a new Parquet JIRA ticket here: 
https://issues.apache.org/jira/browse/PARQUET-251


Yijie

On April 12, 2015 at 11:48:57 PM, Cheng Lian (lian.cs@gmail.com 
) wrote:



Thanks for reporting this! Would you mind to open JIRA tickets for both
Spark and Parquet?

I'm not sure whether Parquet declares somewhere the user mustn't reuse
byte arrays when using binary type. If it does, then it's a Spark bug.
Anyway, this should be fixed.

Cheng

On 4/12/15 1:50 PM, Yijie Shen wrote:
> Hi,
>
> Suppose I create a dataRDD which extends RDD[Row], and each row is
> GenericMutableRow(Array(Int, Array[Byte])). A same Array[Byte] 
object is
> reused among rows but has different content each time. When I 
convert it to
> a dataFrame and save it as Parquet File, the file's row group 
statistic(max

> & min) of Binary column would be wrong.
>
>
>
> Here is the reason: In Parquet, BinaryStatistic just keep max & min as
> parquet.io.api.Binary references, Spark sql would generate a new 
Binary

> backed by the same Array[Byte] passed from row.
> reference backed max: 
Binary-->ByteArrayBackedBinary-->

> Array[Byte]
>
> Therefore, each time parquet updating row group's statistic, max & min
> would always refer to the same Array[Byte], which has new content each
> time. When parquet decides to save it into file, the last row's 
content

> would be saved as both max & min.
>
>
>
> It seems it is a parquet bug because it's parquet's responsibility to
> update statistics correctly.
> But not quite sure. Should I report it as a bug in parquet JIRA?
>
>
> The spark JIRA is https://issues.apache.org/jira/browse/SPARK-6859
>





Reading files from http server

2015-04-13 Thread Peter Rudenko
Hi, i want to play with Criteo 1 tb dataset. Files are located on azure 
storage. Here's a command to download them:
curl -O 
http://azuremlsampleexperiments.blob.core.windows.net/criteo/day_{`seq 
-s ‘,’ 0 23`}.gz
is there any way to read files through http protocol with spark without 
downloading them first to hdfs?. Something like this:
sc.textFile(" 
http://azuremlsampleexperiments.blob.core.windows.net/criteo/day_{0-23}.gz";), 
so it will have 24 partitions.


Thanks,
Peter Rudenko



How to use multiple app jar files?

2015-04-13 Thread Michael Weir
My app works fine with the single, "uber" jar file containing my app and
all its dependencies. However, it takes about 20 minutes to copy the 65MB
jar file up to the node on the cluster, so my "code, compile, test" cycle
has become a "core, compile, cooppp, test" cycle.

I'd like to have a single dependencies jar file on the node, and use a
separate small jar for my app (which takes around 10 seconds to copy to the
node).

I've tried using "--jars deps.jar", but that copies the deps.jar to the
app-* folder but not the driver-* folder, so I get classNotFound errors on
the driver. Various other combinations of flags, etc. have produced a fair
bit of frustration but no progress.

Any help with this would be greatly appreciated, as this problem is
significantly stretching the length of my work day!

Thanks.


Re: MLlib : Gradient Boosted Trees classification confidence

2015-04-13 Thread mike
Thank you Peter.

I just want to be sure.
even if I use the "classification" setting the GBT uses regression trees
and not classification trees?

I know the difference between the two(theoretically) is only in the loss
and impurity functions.
thus in case it uses classification trees doing what you proposed will
result in the classification it self.

Also by looking in the scala API
I found that each Node holds a Predict object which contains "probability
of the label (classification only)" (
https://spark.apache.org/docs/latest/api/scala/#org.apache.spark.mllib.tree.model.Predict
)
** This what i called confidence


So to sum-up my questions and confusion:
1. Does GBT uses classification trees when setting it to classification or
it always uses regression trees ?
2. In case it uses classification trees , How could i efficiently get to
the confidence = Node. Predict.prob ?

Thanks again'
Michael



On Mon, Apr 13, 2015 at 10:13 AM, pprett [via Apache Spark User List] <
ml-node+s1001560n22470...@n3.nabble.com> wrote:

> Hi Mike,
>
> Gradient Boosted Trees (or gradient boosted regression trees) dont store
> probabilities in each leaf node but rather model a continuous function
> which is then transformed via a logistic sigmoid (ie. like in a Logistic
> Regression model).
> If you are just interested in a confidence, you can use this continuous
> function directly: its just the (weighted) sum of the predictions of the
> individual regression trees. Use the absolute value for confidence and the
> sign to determine which class label.
> Here is an example:
>
> def score(features: Vector): Double = {
> val treePredictions = gbdt.trees.map(_.predict(features))
> blas.ddot(gbdt.numTrees, treePredictions, 1, gbdt.treeWeights, 1)
> }
>
> If you are rather interested in probabilities, just pass the function
> value to a logistic sigmoid.
>
> best,
>  Peter
>
> --
>  If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/MLlib-Gradient-Boosted-Trees-classification-confidence-tp22466p22470.html
>  To unsubscribe from MLlib : Gradient Boosted Trees classification
> confidence, click here
> 
> .
> NAML
> 
>




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/MLlib-Gradient-Boosted-Trees-classification-confidence-tp22466p22476.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Sqoop parquet file not working in spark

2015-04-13 Thread bipin
Hi I imported a table from mssql server with Sqoop 1.4.5 in parquet format.
But when I try to load it from Spark shell, it throws error like :

scala> val df1 = sqlContext.load("/home/bipin/Customer2")
scala.collection.parallel.CompositeThrowable: Multiple exceptions thrown
during a parallel computation: java.lang.NullPointerException
parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics(ParquetMetadataConverter.java:249)
parquet.format.converter.ParquetMetadataConverter.fromParquetMetadata(ParquetMetadataConverter.java:543)
parquet.format.converter.ParquetMetadataConverter.readParquetMetadata(ParquetMetadataConverter.java:520)
parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:426)
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:298)
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:297)
scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
.
.
.
at scala.collection.parallel.package$$anon$1.alongWith(package.scala:87)
at scala.collection.parallel.Task$class.mergeThrowables(Tasks.scala:86)
at
scala.collection.parallel.mutable.ParArray$Map.mergeThrowables(ParArray.scala:650)
at scala.collection.parallel.Task$class.tryMerge(Tasks.scala:72)
at
scala.collection.parallel.mutable.ParArray$Map.tryMerge(ParArray.scala:650)
at
scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:190)
at
scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:514)
at
scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:162)
at
scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
at 
scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

I looked at the sqoop parquet folder and it's structure is different than
the one that I created on Spark. How can I make the parquet file work ? 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Sqoop-parquet-file-not-working-in-spark-tp22477.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to use multiple app jar files?

2015-04-13 Thread ๏̯͡๏
I faced exact same issue. The way i solved it was

1. Copy entire project.
2. Delete all the source, have only the dependencies in pom.xml. This will
create, fat jar, without source but deps only.
3. In original project keep it as is, now build it. this will create a JAR
(no deps, by default)

Now add your deps jar to spark, with --jar

This requires two projects, but thats better than earlier.

On Mon, Apr 13, 2015 at 4:45 PM, Michael Weir 
wrote:

> My app works fine with the single, "uber" jar file containing my app and
> all its dependencies. However, it takes about 20 minutes to copy the 65MB
> jar file up to the node on the cluster, so my "code, compile, test" cycle
> has become a "core, compile, cooppp, test" cycle.
>
> I'd like to have a single dependencies jar file on the node, and use a
> separate small jar for my app (which takes around 10 seconds to copy to the
> node).
>
> I've tried using "--jars deps.jar", but that copies the deps.jar to the
> app-* folder but not the driver-* folder, so I get classNotFound errors on
> the driver. Various other combinations of flags, etc. have produced a fair
> bit of frustration but no progress.
>
> Any help with this would be greatly appreciated, as this problem is
> significantly stretching the length of my work day!
>
> Thanks.
>



-- 
Deepak


Re: Exception"Driver-Memory" while running Spark job on Yarn-cluster

2015-04-13 Thread ๏̯͡๏
Try this

./bin/spark-submit -v --master yarn-cluster --jars
./libs/mysql-connector-java-5.1.17.jar,./libs/log4j-1.2.17.jar --files
datasource.properties,log4j.properties  --num-executors 1 --driver-memory
4g *--driver-java-options "-XX:MaxPermSize=1G"* --executor-memory 2g
--executor-cores 1 --class com.test.spark.jobs.AggregationJob sparkagg.jar


I noticed that your using mysql-connector-java-5.1.17.jar. Are you running
Spark-SQL (Hive queries from Spark) ?


On Mon, Apr 13, 2015 at 3:53 PM, sachin Singh 
wrote:

> Hi ,
> When I am submitting spark job as --master yarn-cluster with below
> command/options getting driver
> memory error-
>
> spark-submit --jars
> ./libs/mysql-connector-java-5.1.17.jar,./libs/log4j-1.2.17.jar --files
> datasource.properties,log4j.properties --master yarn-cluster
> --num-executors
> 1 --driver-memory 2g --executor-memory 512m --class
> com.test.spark.jobs.AggregationJob sparkagg.jar
>
> Exceptions as per yarn application ID log as under -
> Container: container_1428938273236_0006_01_01 on
> mycom.hostname.com_8041
>
> =
> LogType: stderr
> LogLength: 128
> Log Contents:
> Exception in thread "Driver"
> Exception: java.lang.OutOfMemoryError thrown from the
> UncaughtExceptionHandler in thread "Driver"
>
> LogType: stdout
> LogLength: 40
>
>
> Container: container_1428938273236_0006_02_01 on
> mycom.hostname.com_8041
>
> =
> LogType: stderr
> LogLength: 1365
> Log Contents:
> java.io.IOException: Log directory
> hdfs://
> mycom.hostname.com:8020/user/spark/applicationHistory/application_1428938273236_0006
> already exists!
> at
> org.apache.spark.util.FileLogger.createLogDir(FileLogger.scala:129)
> at org.apache.spark.util.FileLogger.start(FileLogger.scala:115)
> at
>
> org.apache.spark.scheduler.EventLoggingListener.start(EventLoggingListener.scala:74)
> at org.apache.spark.SparkContext.(SparkContext.scala:353)
> at
>
> org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:61)
>
> LogType: stdout
> LogLength: 40
>
>
> please help its urgent for me,
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Exception-Driver-Memory-while-running-Spark-job-on-Yarn-cluster-tp22475.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Deepak


Packaging Java + Python library

2015-04-13 Thread Punya Biswal
Dear Spark users,

My team is working on a small library that builds on PySpark and is organized 
like PySpark as well -- it has a JVM component (that runs in the Spark driver 
and executor) and a Python component (that runs in the PySpark driver and 
executor processes). What's a good approach for packaging such a library?

Some ideas we've considered:
Package up the JVM component as a Jar and the Python component as a binary egg. 
This is reasonable but it means that there are two separate artifacts that 
people have to manage and keep in sync.
Include Python files in the Jar and add it to the PYTHONPATH. This follows the 
example of the Spark assembly jar, but deviates from the Python community's 
standards.
We'd really appreciate hearing experiences from other people who have built 
libraries on top of PySpark.

Punya



smime.p7s
Description: S/MIME cryptographic signature


Re: Packaging Java + Python library

2015-04-13 Thread prabeesh k
Refer this post
http://blog.prabeeshk.com/blog/2015/04/07/self-contained-pyspark-application/

On 13 April 2015 at 17:41, Punya Biswal  wrote:

> Dear Spark users,
>
> My team is working on a small library that builds on PySpark and is
> organized like PySpark as well -- it has a JVM component (that runs in the
> Spark driver and executor) and a Python component (that runs in the PySpark
> driver and executor processes). What's a good approach for packaging such a
> library?
>
> Some ideas we've considered:
>
>- Package up the JVM component as a Jar and the Python component as a
>binary egg. This is reasonable but it means that there are two separate
>artifacts that people have to manage and keep in sync.
>- Include Python files in the Jar and add it to the PYTHONPATH. This
>follows the example of the Spark assembly jar, but deviates from the Python
>community's standards.
>
> We'd really appreciate hearing experiences from other people who have
> built libraries on top of PySpark.
>
> Punya
>


Re: Spark TeraSort source request

2015-04-13 Thread Tom Hubregtsen
Thank you for your response Ewan. I quickly looked yesterday and it was
there, but today at work I tried to open it again to start working on it,
but it appears to be removed. Is this correct?

Thanks,

Tom

On 12 April 2015 at 06:58, Ewan Higgs  wrote:

>  Hi all.
> The code is linked from my repo:
>
> https://github.com/ehiggs/spark-terasort
> "
> This is an example Spark program for running TeraSort benchmarks. It is
> based on work from Reynold Xin's branch
> , but it is not the same
> TeraSort program that currently holds the record
> . That program is here
> 
> .
> "
>
> "That program is here" links to:
>
> https://github.com/rxin/spark/tree/sort-benchmark/core/src/main/scala/org/apache/spark/sort
>
> I've been working on other projects at the moment so I haven't returned to
> the spark-terasort stuff. If you have any pull requests, I would be very
> grateful.
>
> Yours,
> Ewan
>
>
> On 08/04/15 03:26, Pramod Biligiri wrote:
>
> +1. I would love to have the code for this as well.
>
>  Pramod
>
> On Fri, Apr 3, 2015 at 12:47 PM, Tom  wrote:
>
>> Hi all,
>>
>> As we all know, Spark has set the record for sorting data, as published
>> on:
>> https://databricks.com/blog/2014/10/10/spark-petabyte-sort.html.
>>
>> Here at our group, we would love to verify these results, and compare
>> machine using this benchmark. We've spend quite some time trying to find
>> the
>> terasort source code that was used, but can not find it anywhere.
>>
>> We did find two candidates:
>>
>> A version posted by Reynold [1], the posted of the message above. This
>> version is stuck at "// TODO: Add partition-local (external) sorting
>> using TeraSortRecordOrdering", only generating data.
>>
>> Here, Ewan noticed that "it didn't appear to be similar to Hadoop
>> TeraSort."
>> [2] After this he created a version on his own [3]. With this version, we
>> noticed problems with TeraValidate with datasets above ~10G (as mentioned
>> by
>> others at [4]. When examining the raw input and output files, it actually
>> appears that the input data is sorted and the output data unsorted in both
>> cases.
>>
>> Because of this, we believe we did not yet find the actual used source
>> code.
>> I've tried to search in the Spark User forum archive's, seeing request of
>> people, indicating a demand, but did not succeed in finding the actual
>> source code.
>>
>> My question:
>> Could you guys please make the source code of the used TeraSort program,
>> preferably with settings, available? If not, what are the reasons that
>> this
>> seems to be withheld?
>>
>> Thanks for any help,
>>
>> Tom Hubregtsen
>>
>> [1]
>>
>> https://github.com/rxin/spark/commit/adcae69145905162fa3b6932f70be2c932f95f87
>> [2]
>>
>> http://mail-archives.apache.org/mod_mbox/spark-dev/201411.mbox/%3c5462092c.1060...@ugent.be%3E
>> [3] https://github.com/ehiggs/spark-terasort
>> [4]
>>
>> http://mail-archives.apache.org/mod_mbox/spark-user/201501.mbox/%3CCAPszQwgap4o1inZkTwcwV=7scwoqtr5yxfnsqo5p2kgp1bn...@mail.gmail.com%3E
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-TeraSort-source-request-tp22371.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>


Configuring amount of disk space available to spark executors in mesos?

2015-04-13 Thread Jonathan Coveney
I'm surprised that I haven't been able to find this via google, but I
haven't...

What is the setting that requests some amount of disk space for the
executors? Maybe I'm misunderstanding how this is configured...

Thanks for any help!


Equi Join is taking for ever. 1 Task is Running while other 199 are complete

2015-04-13 Thread ๏̯͡๏
Code:

val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))]
= lstgItem.join(viEvents).map {
  case (itemId, (listing, viDetail)) =>
val viSummary = new VISummary
viSummary.leafCategoryId = listing.getLeafCategId().toInt
viSummary.itemSiteId = listing.getItemSiteId().toInt
viSummary.auctionTypeCode = listing.getAuctTypeCode().toInt
viSummary.sellerCountryId = listing.getSlrCntryId().toInt
viSummary.buyerSegment = "0"
viSummary.isBin = (if (listing.getBinPriceLstgCurncy.doubleValue()
> 0) 1 else 0)
val sellerId = listing.getSlrId.toLong
(sellerId, (viDetail, viSummary, itemId))
}

Running Tasks:
Tasks IndexIDAttemptStatus ▾Locality LevelExecutor ID / HostLaunch Time
DurationGC TimeShuffle Read Size / RecordsWrite TimeShuffle Write Size /
RecordsShuffle Spill (Memory)Shuffle Spill (Disk)Errors  0 216 0 RUNNING
PROCESS_LOCAL 181 / phxaishdc9dn0474.phx.ebay.com 2015/04/13 06:43:53 1.7 h
13 min  3.0 GB / 56964921  0.0 B / 0  21.2 GB 1902.6 MB   2 218 0 SUCCESS
PROCESS_LOCAL 582 / phxaishdc9dn0235.phx.ebay.com 2015/04/13 06:43:53 15
min  31 s  2.2 GB / 1666851  0.1 s 3.0 MB / 2062  54.8 GB 1924.5 MB   1 217
0 SUCCESS PROCESS_LOCAL 202 / phxdpehdc9dn2683.stratus.phx.ebay.com 2015/04/13
06:43:53 19 min  1.3 min  2.2 GB / 1687086  75 ms 3.9 MB / 2692  33.7 GB 1960.4
MB   4 220 0 SUCCESS PROCESS_LOCAL 218 / phxaishdc9dn0855.phx.ebay.com
2015/04/13
06:43:53 15 min  56 s  2.2 GB / 1675654  40 ms 3.3 MB / 2260  26.2 GB 1928.4
MB



Command:
./bin/spark-submit -v --master yarn-cluster --driver-class-path
/apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar
--jars
/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.3/spark_reporting_dep_only-1.0-SNAPSHOT.jar
 --num-executors 3000 --driver-memory 12g --driver-java-options
"-XX:MaxPermSize=6G" --executor-memory 12g --executor-cores 1 --queue
hdmi-express --class com.ebay.ep.poc.spark.reporting.SparkApp
/home/dvasthimal/spark1.3/spark_reporting-1.0-SNAPSHOT.jar
startDate=2015-04-6 endDate=2015-04-7
input=/user/dvasthimal/epdatasets_small/exptsession subcommand=viewItem
output=/user/dvasthimal/epdatasets/viewItem buffersize=128
maxbuffersize=1068 maxResultSize=2G


What do i do ? I killed the job twice and its stuck again. Where is it
stuck ?

-- 
Deepak


Spark Streaming Kafka Consumer, Confluent Platform, Avro & StorageLevel

2015-04-13 Thread Nicolas Phung
Hello,

I'm trying to use a Spark Streaming (1.2.0-cdh5.3.2) consumer
(via spark-streaming-kafka lib of the same version) with Kafka's Confluent
Platform 1.0.

I manage to make a Producer that produce my data and can check it via the
avro-console-consumer :

"./bin/kafka-avro-console-consumer --topic test --zookeeper localhost:2181
--from-beginning"

which displays :

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/usr/share/java/confluent-common/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/usr/share/java/schema-registry/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
{"group":"","domainType":"job","action":"apply_freely","entity":"14564564132","user":{"string":"user"},"session":{"string":"session"},"date":"20150326T154052.000+0100","ip":"192.168.0.1","userAgent":"Mozilla/5.0
(X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko)
Chrome/41.0.2272.101 Safari/537.36","referer":"
http://www.kb.com/offre/controleur-de-gestion-h-f-9775029
","os":"Linux","deviceType":"Computer","browser":"Chrome","browserVersion":"41.0.2272.101","browserRenderer":"WEBKIT","physicalServerOrigin":"01.front.local"}
{"group":"","domainType":"job","action":"apply_freely","entity":"14564564132","user":{"string":"user"},"session":{"string":"session"},"date":"20150326T154052.000+0100","ip":"192.168.0.1","userAgent":"Mozilla/5.0
(X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko)
Chrome/41.0.2272.101 Safari/537.36","referer":"
http://www.kb.com/offre/controleur-de-gestion-h-f-9775029
","os":"Linux","deviceType":"Computer","browser":"Chrome","browserVersion":"41.0.2272.101","browserRenderer":"WEBKIT","physicalServerOrigin":"01.front.local"}

So far, so good ! Now here's the weird thing, When I'm using the following
the  spark-streaming-kafka:

val kafkaParams = Map[String, String]("metadata.broker.list" ->
brokers, "group.id" -> consumer, "zookeeper.connect" -> zookeeper,
"auto.offset.reset" -> "smallest", "schema.registry.url" ->
schemaRegistryUrl)
val topicMap = topics.split(",").map((_, 2)).toMap
val messages = KafkaUtils.createStream[Object, Object,
KafkaAvroDecoder, KafkaAvroDecoder](ssc, kafkaParams, topicMap,
StorageLevel.MEMORY_AND_DISK_SER)

val events = messages.map(_._2)
val dStream: DStream[Event] = events.map(
  avroMessage => {
println("Avro content: " + avroMessage)
  }
)
dStream.print()


Spark Streaming micro batch show me this :


15/04/10 16:11:40 INFO storage.BlockManager: Found block
input-0-1428675099000 locally
Avro content: {"group": "", "domainType": "", "action":
"", "entity": "", "user": "", "session": "", "date":
"", "ip": "", "userAgent": "", "referer": "", "os":
"", "deviceType": "", "browser": "", "browserVersion":
"", "browserRenderer": "", "physicalServerOrigin": ""}
Avro content: {"group": "", "domainType": "", "action":
"", "entity": "", "user": "", "session": "", "date":
"", "ip": "", "userAgent": "", "referer": "", "os":
"", "deviceType": "", "browser": "", "browserVersion":
"", "browserRenderer": "", "physicalServerOrigin": ""}


When I'm changing the snippet code to this (change the StorageLevel to a
non *_SER one) like this :

val messages = KafkaUtils.createStream[Object, Object,
KafkaAvroDecoder, KafkaAvroDecoder](ssc, kafkaParams, topicMap,
*StorageLevel.MEMORY_AND_DISK*)


And it works as expected:


15/04/10 16:29:02 INFO executor.Executor: Running task 0.0 in stage 2.0 (TID 2)
15/04/10 16:29:02 INFO storage.BlockManager: Found block
input-0-1428676140400 locally
Avro content: {"group": "", "domainType": "job", "action":
"apply_freely", "entity": "14564564132", "user": "user", "session":
"session", "date": "20150326T154052.000+0100", "ip": "192.168.0.1",
"userAgent": "Mozilla\/5.0 (X11; Linux x86_64) AppleWebKit\/537.36
(KHTML, like Gecko) Chrome\/41.0.2272.101 Safari\/537.36", "referer":
"http:\/\/www.kb.com\/offre\/controleur-de-gestion-h-f-9775029", "os":
"Linux", "deviceType": "Computer", "browser": "Chrome",
"browserVersion": "41.0.2272.101", "browserRenderer": "WEBKIT",
"physicalServerOrigin": "01.front.local"}
Avro content: {"group": "", "domainType": "job", "action":
"apply_freely", "entity": "14564564132", "user": "user", "session":
"session", "date": "20150326T154052.000+0100", "ip": "192.168.0.1",
"userAgent": "Mozilla\/5.0 (X11; Linux x86_64) AppleWebKit\/537.36
(KHTML, like Gecko) Chrome\/41.0.2272.101 Safari\/537.36", "referer":
"http:\/\/www.kb.com\/offre\/controleur-de-gestion-h-f-9775029", "os":
"Linux", "deviceType": "Computer", "browser": "Chrome",
"browserVersion": "41.0.2272.101", "browserRenderer": "WEBKIT",
"physicalServerOrigin": "01.front.local"}


If Kryo

What's the cleanest way to make spark aware of my custom scheduler?

2015-04-13 Thread Jonathan Coveney
I need to have my own scheduler to point to a proprietary remote execution
framework.

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L2152

I'm looking at where it decides on the backend and it doesn't look like
there is a hook. Of course I can extend sparkContext and add my own, but
that seems sort of lame. Wondering if people know of a better way (or maybe
I'm just missing something obvious)


Re: "Could not compute split, block not found" in Spark Streaming Simple Application

2015-04-13 Thread Saiph Kappa
Whether I use 1 or 2 machines, the results are the same... Here follows the
results I got using 1 and 2 receivers with 2 machines:

2 machines, 1 receiver:

sbt/sbt "run-main Benchmark 1 machine1  1000" 2>&1 | grep -i "Total
delay\|record"

15/04/13 16:41:34 INFO JobScheduler: Total delay: 0.156 s for time
1428939694000 ms (execution: 0.142 s)
Received 92910 records
15/04/13 16:41:35 INFO JobScheduler: Total delay: 0.155 s for time
1428939695000 ms (execution: 0.142 s)
Received 92910 records
15/04/13 16:41:36 INFO JobScheduler: Total delay: 0.132 s for time
1428939696000 ms (execution: 0.119 s)
Received 92910 records
15/04/13 16:41:37 INFO JobScheduler: Total delay: 0.172 s for time
1428939697000 ms (execution: 0.161 s)
Received 92910 records
15/04/13 16:41:38 INFO JobScheduler: Total delay: 0.152 s for time
1428939698000 ms (execution: 0.140 s)
Received 92910 records
15/04/13 16:41:39 INFO JobScheduler: Total delay: 0.162 s for time
1428939699000 ms (execution: 0.149 s)
Received 92910 records
15/04/13 16:41:40 INFO JobScheduler: Total delay: 0.156 s for time
142893970 ms (execution: 0.143 s)
Received 92910 records
15/04/13 16:41:41 INFO JobScheduler: Total delay: 0.148 s for time
1428939701000 ms (execution: 0.135 s)
Received 92910 records
15/04/13 16:41:42 INFO JobScheduler: Total delay: 0.149 s for time
1428939702000 ms (execution: 0.135 s)
Received 92910 records
15/04/13 16:41:43 INFO JobScheduler: Total delay: 0.153 s for time
1428939703000 ms (execution: 0.136 s)
Received 92910 records
15/04/13 16:41:44 INFO JobScheduler: Total delay: 0.118 s for time
1428939704000 ms (execution: 0.111 s)
Received 92910 records
15/04/13 16:41:45 INFO JobScheduler: Total delay: 0.155 s for time
1428939705000 ms (execution: 0.143 s)
Received 92910 records
15/04/13 16:41:46 INFO JobScheduler: Total delay: 0.138 s for time
1428939706000 ms (execution: 0.126 s)
Received 92910 records
15/04/13 16:41:47 INFO JobScheduler: Total delay: 0.154 s for time
1428939707000 ms (execution: 0.142 s)
Received 92910 records
15/04/13 16:41:48 INFO JobScheduler: Total delay: 0.172 s for time
1428939708000 ms (execution: 0.160 s)
Received 92910 records
15/04/13 16:41:49 INFO JobScheduler: Total delay: 0.144 s for time
1428939709000 ms (execution: 0.133 s)


Receiver Statistics

   - Receiver


   - Status


   - Location


   - Records in last batch
   - [2015/04/13 16:53:54]


   - Minimum rate
   - [records/sec]


   - Median rate
   - [records/sec]


   - Maximum rate
   - [records/sec]


   - Last Error

Receiver-0---10-10-100-
2 machines, 2 receivers:

sbt/sbt "run-main Benchmark 2 machine1  1000" 2>&1 | grep -i "Total
delay\|record"

Received 92910 records
15/04/13 16:43:13 INFO JobScheduler: Total delay: 0.153 s for time
1428939793000 ms (execution: 0.142 s)
Received 92910 records
15/04/13 16:43:14 INFO JobScheduler: Total delay: 0.144 s for time
1428939794000 ms (execution: 0.136 s)
Received 92910 records
15/04/13 16:43:15 INFO JobScheduler: Total delay: 0.145 s for time
1428939795000 ms (execution: 0.132 s)
Received 92910 records
15/04/13 16:43:16 INFO JobScheduler: Total delay: 0.144 s for time
1428939796000 ms (execution: 0.134 s)
Received 92910 records
15/04/13 16:43:17 INFO JobScheduler: Total delay: 0.148 s for time
1428939797000 ms (execution: 0.142 s)
Received 92910 records
15/04/13 16:43:18 INFO JobScheduler: Total delay: 0.136 s for time
1428939798000 ms (execution: 0.123 s)
Received 92910 records
15/04/13 16:43:19 INFO JobScheduler: Total delay: 0.155 s for time
1428939799000 ms (execution: 0.145 s)
Received 92910 records
15/04/13 16:43:20 INFO JobScheduler: Total delay: 0.160 s for time
142893980 ms (execution: 0.152 s)
Received 83619 records
15/04/13 16:43:21 INFO JobScheduler: Total delay: 0.141 s for time
1428939801000 ms (execution: 0.131 s)
Received 102201 records
15/04/13 16:43:22 INFO JobScheduler: Total delay: 0.208 s for time
1428939802000 ms (execution: 0.197 s)
Received 83619 records
15/04/13 16:43:23 INFO JobScheduler: Total delay: 0.160 s for time
1428939803000 ms (execution: 0.147 s)
Received 92910 records
15/04/13 16:43:24 INFO JobScheduler: Total delay: 0.197 s for time
1428939804000 ms (execution: 0.185 s)
Received 92910 records
15/04/13 16:43:25 INFO JobScheduler: Total delay: 0.200 s for time
1428939805000 ms (execution: 0.189 s)
Received 92910 records
15/04/13 16:43:26 INFO JobScheduler: Total delay: 0.181 s for time
1428939806000 ms (execution: 0.173 s)
Received 92910 records
15/04/13 16:43:27 INFO JobScheduler: Total delay: 0.189 s for time
1428939807000 ms (execution: 0.178 s)

Receiver Statistics

   - Receiver


   - Status


   - Location


   - Records in last batch
   - [2015/04/13 16:49:36]


   - Minimum rate
   - [records/sec]


   - Median rate
   - [records/sec]


   - Maximum rate
   - [records/sec]


   - Last Error

Receiver-0---10-10-10-9-Receiver-1---

On Thu, Apr 9, 2015 at 7:55 PM, Tathagata Das  wrote:

> Are you running # of receivers = # machines?
>
> TD
>
>

Help in transforming the RDD

2015-04-13 Thread Jeetendra Gangele
Hi All I have an JavaPairRDD where each long key have 4
 string values associated with it. I want to fire the Hbase query for look
up of the  each String part of RDD.
This look-up will give result of around 7K integers.so for each key I will
have 7k values. now my  input RDD always already more than GB and after
getting these result it will become around 50 GB which  I want avoid .

My problem. <1, Test1>
<1,test2>
 <1.test3>
 <1, test4>
 ...
 .
Now I will query Hbase for Test1, test2 test3 ,test4 in parallel ech query
will give result around 2K so total 8k of integers.

Now for each record I will have 1*8000 entries in my RDD and suppose I have
1 million record it will become 1 million*8000 will is huge to process even
using GroupBy.


Re: Equi Join is taking for ever. 1 Task is Running while other 199 are complete

2015-04-13 Thread Jonathan Coveney
My guess would be data skew. Do you know if there is some item id that is a
catch all? can it be null? item id 0? lots of data sets have this sort of
value and it always kills joins

2015-04-13 11:32 GMT-04:00 ÐΞ€ρ@Ҝ (๏̯͡๏) :

> Code:
>
> val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary,
> Long))] = lstgItem.join(viEvents).map {
>   case (itemId, (listing, viDetail)) =>
> val viSummary = new VISummary
> viSummary.leafCategoryId = listing.getLeafCategId().toInt
> viSummary.itemSiteId = listing.getItemSiteId().toInt
> viSummary.auctionTypeCode = listing.getAuctTypeCode().toInt
> viSummary.sellerCountryId = listing.getSlrCntryId().toInt
> viSummary.buyerSegment = "0"
> viSummary.isBin = (if (listing.getBinPriceLstgCurncy.doubleValue()
> > 0) 1 else 0)
> val sellerId = listing.getSlrId.toLong
> (sellerId, (viDetail, viSummary, itemId))
> }
>
> Running Tasks:
> Tasks IndexIDAttemptStatus ▾Locality LevelExecutor ID / HostLaunch Time
> DurationGC TimeShuffle Read Size / RecordsWrite TimeShuffle Write Size /
> RecordsShuffle Spill (Memory)Shuffle Spill (Disk)Errors  0 216 0 RUNNING
> PROCESS_LOCAL 181 / phxaishdc9dn0474.phx.ebay.com 2015/04/13 06:43:53 1.7
> h  13 min  3.0 GB / 56964921  0.0 B / 0  21.2 GB 1902.6 MB   2 218 0
> SUCCESS PROCESS_LOCAL 582 / phxaishdc9dn0235.phx.ebay.com 2015/04/13
> 06:43:53 15 min  31 s  2.2 GB / 1666851  0.1 s 3.0 MB / 2062  54.8 GB 1924.5
> MB   1 217 0 SUCCESS PROCESS_LOCAL 202 /
> phxdpehdc9dn2683.stratus.phx.ebay.com 2015/04/13 06:43:53 19 min  1.3 min
> 2.2 GB / 1687086  75 ms 3.9 MB / 2692  33.7 GB 1960.4 MB   4 220 0 SUCCESS
> PROCESS_LOCAL 218 / phxaishdc9dn0855.phx.ebay.com 2015/04/13 06:43:53 15
> min  56 s  2.2 GB / 1675654  40 ms 3.3 MB / 2260  26.2 GB 1928.4 MB
>
>
>
> Command:
> ./bin/spark-submit -v --master yarn-cluster --driver-class-path
> /apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar
> --jars
> /apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.3/spark_reporting_dep_only-1.0-SNAPSHOT.jar
>  --num-executors 3000 --driver-memory 12g --driver-java-options
> "-XX:MaxPermSize=6G" --executor-memory 12g --executor-cores 1 --queue
> hdmi-express --class com.ebay.ep.poc.spark.reporting.SparkApp
> /home/dvasthimal/spark1.3/spark_reporting-1.0-SNAPSHOT.jar
> startDate=2015-04-6 endDate=2015-04-7
> input=/user/dvasthimal/epdatasets_small/exptsession subcommand=viewItem
> output=/user/dvasthimal/epdatasets/viewItem buffersize=128
> maxbuffersize=1068 maxResultSize=2G
>
>
> What do i do ? I killed the job twice and its stuck again. Where is it
> stuck ?
>
> --
> Deepak
>
>


Need some guidance

2015-04-13 Thread Marco Shaw
**Learning the ropes**

I'm trying to grasp the concept of using the pipeline in pySpark...

Simplified example:
>>>
list=[(1,"alpha"),(1,"beta"),(1,"foo"),(1,"alpha"),(2,"alpha"),(2,"alpha"),(2,"bar"),(3,"foo")]

Desired outcome:
[(1,3),(2,2),(3,1)]

Basically for each key, I want the number of unique values.

I've tried different approaches, but am I really using Spark effectively?
I wondered if I would do something like:
>>> input=sc.parallelize(list)
>>> input.groupByKey().collect()

Then I wondered if I could do something like a foreach over each key value,
and then map the actual values and reduce them.  Pseudo-code:

input.groupbykey()
.keys
.foreach(_.values
.map(lambda x: x,1)
.reducebykey(lambda a,b:a+b)
.count()
)

I was somehow hoping that the key would get the current value of count, and
thus be the count of the unique keys, which is exactly what I think I'm
looking for.

Am I way off base on how I could accomplish this?

Marco


Multipart upload to S3 fails with Bad Digest Exceptions

2015-04-13 Thread Eugen Cepoi
Hi,

I am not sure my problem is relevant to spark, but perhaps someone else had
the same error. When I try to write files that need multipart upload to S3
from a job on EMR I always get this error:

com.amazonaws.services.s3.model.AmazonS3Exception: The Content-MD5 you
specified did not match what we received.

If I disable multipart upload via fs.s3n.multipart.uploads.enabled (or
output smaller files that don't require multi part upload), then everything
works fine.

I've seen an old thread on the ML where someone has the same error, but in
my case I don't have any other errors on the worker nodes.

I am using spark 1.2.1 and hadoop 2.4.0.

Thanks,
Eugen


Re: Equi Join is taking for ever. 1 Task is Running while other 199 are complete

2015-04-13 Thread ๏̯͡๏
You mean there is a tuple in either RDD, that has itemID = 0 or null ?
And what is catch all ?

That implies is it a good idea to run a filter on each RDD first ? We do
not do this using Pig on M/R. Is it required in Spark world ?

On Mon, Apr 13, 2015 at 9:58 PM, Jonathan Coveney 
wrote:

> My guess would be data skew. Do you know if there is some item id that is
> a catch all? can it be null? item id 0? lots of data sets have this sort of
> value and it always kills joins
>
> 2015-04-13 11:32 GMT-04:00 ÐΞ€ρ@Ҝ (๏̯͡๏) :
>
> Code:
>>
>> val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary,
>> Long))] = lstgItem.join(viEvents).map {
>>   case (itemId, (listing, viDetail)) =>
>> val viSummary = new VISummary
>> viSummary.leafCategoryId = listing.getLeafCategId().toInt
>> viSummary.itemSiteId = listing.getItemSiteId().toInt
>> viSummary.auctionTypeCode = listing.getAuctTypeCode().toInt
>> viSummary.sellerCountryId = listing.getSlrCntryId().toInt
>> viSummary.buyerSegment = "0"
>> viSummary.isBin = (if
>> (listing.getBinPriceLstgCurncy.doubleValue() > 0) 1 else 0)
>> val sellerId = listing.getSlrId.toLong
>> (sellerId, (viDetail, viSummary, itemId))
>> }
>>
>> Running Tasks:
>> Tasks IndexIDAttemptStatus ▾Locality LevelExecutor ID / HostLaunch Time
>> DurationGC TimeShuffle Read Size / RecordsWrite TimeShuffle Write Size /
>> RecordsShuffle Spill (Memory)Shuffle Spill (Disk)Errors  0 216 0 RUNNING
>> PROCESS_LOCAL 181 / phxaishdc9dn0474.phx.ebay.com 2015/04/13 06:43:53 1.7
>> h  13 min  3.0 GB / 56964921  0.0 B / 0  21.2 GB 1902.6 MB   2 218 0
>> SUCCESS PROCESS_LOCAL 582 / phxaishdc9dn0235.phx.ebay.com 2015/04/13
>> 06:43:53 15 min  31 s  2.2 GB / 1666851  0.1 s 3.0 MB / 2062  54.8 GB 1924.5
>> MB   1 217 0 SUCCESS PROCESS_LOCAL 202 /
>> phxdpehdc9dn2683.stratus.phx.ebay.com 2015/04/13 06:43:53 19 min  1.3
>> min  2.2 GB / 1687086  75 ms 3.9 MB / 2692  33.7 GB 1960.4 MB   4 220 0
>> SUCCESS PROCESS_LOCAL 218 / phxaishdc9dn0855.phx.ebay.com 2015/04/13
>> 06:43:53 15 min  56 s  2.2 GB / 1675654  40 ms 3.3 MB / 2260  26.2 GB 1928.4
>> MB
>>
>>
>>
>> Command:
>> ./bin/spark-submit -v --master yarn-cluster --driver-class-path
>> /apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar
>> --jars
>> /apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.3/spark_reporting_dep_only-1.0-SNAPSHOT.jar
>>  --num-executors 3000 --driver-memory 12g --driver-java-options
>> "-XX:MaxPermSize=6G" --executor-memory 12g --executor-cores 1 --queue
>> hdmi-express --class com.ebay.ep.poc.spark.reporting.SparkApp
>> /home/dvasthimal/spark1.3/spark_reporting-1.0-SNAPSHOT.jar
>> startDate=2015-04-6 endDate=2015-04-7
>> input=/user/dvasthimal/epdatasets_small/exptsession subcommand=viewItem
>> output=/user/dvasthimal/epdatasets/viewItem buffersize=128
>> maxbuffersize=1068 maxResultSize=2G
>>
>>
>> What do i do ? I killed the job twice and its stuck again. Where is it
>> stuck ?
>>
>> --
>> Deepak
>>
>>
>


-- 
Deepak


Re: How to use multiple app jar files?

2015-04-13 Thread Marcelo Vanzin
You can copy the dependencies to all nodes in your cluster, and then
use "spark.{executor,driver}.extraClassPath" to add them to the
classpath of your executors / driver.



On Mon, Apr 13, 2015 at 4:15 AM, Michael Weir
 wrote:
> My app works fine with the single, "uber" jar file containing my app and all
> its dependencies. However, it takes about 20 minutes to copy the 65MB jar
> file up to the node on the cluster, so my "code, compile, test" cycle has
> become a "core, compile, cooppp, test" cycle.
>
> I'd like to have a single dependencies jar file on the node, and use a
> separate small jar for my app (which takes around 10 seconds to copy to the
> node).
>
> I've tried using "--jars deps.jar", but that copies the deps.jar to the
> app-* folder but not the driver-* folder, so I get classNotFound errors on
> the driver. Various other combinations of flags, etc. have produced a fair
> bit of frustration but no progress.
>
> Any help with this would be greatly appreciated, as this problem is
> significantly stretching the length of my work day!
>
> Thanks.



-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Equi Join is taking for ever. 1 Task is Running while other 199 are complete

2015-04-13 Thread Jonathan Coveney
I can promise you that this is also a problem in the pig world :) not sure
why it's not a problem for this data set, though... are you sure that the
two are doing the exact same code?

you should inspect your source data. Make a histogram for each and see what
the data distribution looks like. If there is a value or bucket with a
disproportionate set of values you know you have an issue

2015-04-13 12:50 GMT-04:00 ÐΞ€ρ@Ҝ (๏̯͡๏) :

> You mean there is a tuple in either RDD, that has itemID = 0 or null ?
> And what is catch all ?
>
> That implies is it a good idea to run a filter on each RDD first ? We do
> not do this using Pig on M/R. Is it required in Spark world ?
>
> On Mon, Apr 13, 2015 at 9:58 PM, Jonathan Coveney 
> wrote:
>
>> My guess would be data skew. Do you know if there is some item id that is
>> a catch all? can it be null? item id 0? lots of data sets have this sort of
>> value and it always kills joins
>>
>> 2015-04-13 11:32 GMT-04:00 ÐΞ€ρ@Ҝ (๏̯͡๏) :
>>
>> Code:
>>>
>>> val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary,
>>> Long))] = lstgItem.join(viEvents).map {
>>>   case (itemId, (listing, viDetail)) =>
>>> val viSummary = new VISummary
>>> viSummary.leafCategoryId = listing.getLeafCategId().toInt
>>> viSummary.itemSiteId = listing.getItemSiteId().toInt
>>> viSummary.auctionTypeCode = listing.getAuctTypeCode().toInt
>>> viSummary.sellerCountryId = listing.getSlrCntryId().toInt
>>> viSummary.buyerSegment = "0"
>>> viSummary.isBin = (if
>>> (listing.getBinPriceLstgCurncy.doubleValue() > 0) 1 else 0)
>>> val sellerId = listing.getSlrId.toLong
>>> (sellerId, (viDetail, viSummary, itemId))
>>> }
>>>
>>> Running Tasks:
>>> Tasks IndexIDAttemptStatus ▾Locality LevelExecutor ID / HostLaunch Time
>>> DurationGC TimeShuffle Read Size / RecordsWrite TimeShuffle Write Size
>>> / RecordsShuffle Spill (Memory)Shuffle Spill (Disk)Errors  0 216 0
>>> RUNNING PROCESS_LOCAL 181 / phxaishdc9dn0474.phx.ebay.com 2015/04/13
>>> 06:43:53 1.7 h  13 min  3.0 GB / 56964921  0.0 B / 0  21.2 GB 1902.6 MB
>>> 2 218 0 SUCCESS PROCESS_LOCAL 582 / phxaishdc9dn0235.phx.ebay.com 2015/04/13
>>> 06:43:53 15 min  31 s  2.2 GB / 1666851  0.1 s 3.0 MB / 2062  54.8 GB 1924.5
>>> MB   1 217 0 SUCCESS PROCESS_LOCAL 202 /
>>> phxdpehdc9dn2683.stratus.phx.ebay.com 2015/04/13 06:43:53 19 min  1.3
>>> min  2.2 GB / 1687086  75 ms 3.9 MB / 2692  33.7 GB 1960.4 MB   4 220 0
>>> SUCCESS PROCESS_LOCAL 218 / phxaishdc9dn0855.phx.ebay.com 2015/04/13
>>> 06:43:53 15 min  56 s  2.2 GB / 1675654  40 ms 3.3 MB / 2260  26.2 GB 1928.4
>>> MB
>>>
>>>
>>>
>>> Command:
>>> ./bin/spark-submit -v --master yarn-cluster --driver-class-path
>>> /apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar
>>> --jars
>>> /apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.3/spark_reporting_dep_only-1.0-SNAPSHOT.jar
>>>  --num-executors 3000 --driver-memory 12g --driver-java-options
>>> "-XX:MaxPermSize=6G" --executor-memory 12g --executor-cores 1 --queue
>>> hdmi-express --class com.ebay.ep.poc.spark.reporting.SparkApp
>>> /home/dvasthimal/spark1.3/spark_reporting-1.0-SNAPSHOT.jar
>>> startDate=2015-04-6 endDate=2015-04-7
>>> input=/user/dvasthimal/epdatasets_small/exptsession subcommand=viewItem
>>> output=/user/dvasthimal/epdatasets/viewItem buffersize=128
>>> maxbuffersize=1068 maxResultSize=2G
>>>
>>>
>>> What do i do ? I killed the job twice and its stuck again. Where is it
>>> stuck ?
>>>
>>> --
>>> Deepak
>>>
>>>
>>
>
>
> --
> Deepak
>
>


feature scaling in GeneralizedLinearAlgorithm.scala

2015-04-13 Thread Jianguo Li
Hi,

In the GeneralizedLinearAlgorithm, which Logistic Regression relied on, it
says "if userFeatureScaling is enabled, we will standardize the training
features , and trained the model in the scaled space. Then we transform
the coefficients from the scaled space to the original space ...".

My understanding then is we do not need to scale the test data since the
coefficients are already in the original space, is this correct?

Thanks

Jianguo


Re: Spark Cluster: RECEIVED SIGNAL 15: SIGTERM

2015-04-13 Thread Tim Chen
Linux OOM throws SIGTERM, but if I remember correctly JVM handles heap
memory limits differently and throws OutOfMemoryError and eventually sends
SIGINT.

Not sure what happened but the worker simply received a SIGTERM signal, so
perhaps the daemon was terminated by someone or a parent process. Just my
guess.

Tim

On Mon, Apr 13, 2015 at 2:28 AM, Guillaume Pitel  wrote:

>  Very likely to be this :
>
> http://www.linuxdevcenter.com/pub/a/linux/2006/11/30/linux-out-of-memory.html?page=2
>
> Your worker ran out of memory => maybe you're asking for too much memory
> for the JVM, or something else is running on the worker
>
> Guillaume
>
>  Any idea what this means, many thanks
>
>  ==>
> logs/spark-.-org.apache.spark.deploy.worker.Worker-1-09.out.1
> <==
> 15/04/13 07:07:22 INFO Worker: Starting Spark worker 09:39910 with 4
> cores, 6.6 GB RAM
> 15/04/13 07:07:22 INFO Worker: Running Spark version 1.3.0
> 15/04/13 07:07:22 INFO Worker: Spark home:
> /remote/users//work/tools/spark-1.3.0-bin-hadoop2.4
> 15/04/13 07:07:22 INFO Server: jetty-8.y.z-SNAPSHOT
> 15/04/13 07:07:22 INFO AbstractConnector: Started
> SelectChannelConnector@0.0.0.0:8081
> 15/04/13 07:07:22 INFO Utils: Successfully started service 'WorkerUI' on
> port 8081.
> 15/04/13 07:07:22 INFO WorkerWebUI: Started WorkerWebUI at
> http://09:8081
> 15/04/13 07:07:22 INFO Worker: Connecting to master
> akka.tcp://sparkMaster@nceuhamnr08:7077/user/Master...
> 15/04/13 07:07:22 INFO Worker: Successfully registered with master
> spark://08:7077
> *15/04/13 08:35:07 ERROR Worker: RECEIVED SIGNAL 15: SIGTERM*
>
>
>
> --
>[image: eXenSa]
>  *Guillaume PITEL, Président*
> +33(0)626 222 431
>
> eXenSa S.A.S. 
>  41, rue Périer - 92120 Montrouge - FRANCE
> Tel +33(0)184 163 677 / Fax +33(0)972 283 705
>


RE: Equi Join is taking for ever. 1 Task is Running while other 199 are complete

2015-04-13 Thread java8964
If it is really due to data skew, will the task hanging has much bigger Shuffle 
Write Size in this case?
In this case, the shuffle write size for that task is 0, and the rest IO of 
this task is not much larger than the fast finished tasks, is that normal?
I am also interested in this case, as from statistics on the UI, how it 
indicates the task could have skew data?
Yong 

Date: Mon, 13 Apr 2015 12:58:12 -0400
Subject: Re: Equi Join is taking for ever. 1 Task is Running while other 199 
are complete
From: jcove...@gmail.com
To: deepuj...@gmail.com
CC: user@spark.apache.org

I can promise you that this is also a problem in the pig world :) not sure why 
it's not a problem for this data set, though... are you sure that the two are 
doing the exact same code?
you should inspect your source data. Make a histogram for each and see what the 
data distribution looks like. If there is a value or bucket with a 
disproportionate set of values you know you have an issue
2015-04-13 12:50 GMT-04:00 ÐΞ€ρ@Ҝ (๏̯͡๏) :
You mean there is a tuple in either RDD, that has itemID = 0 or null ? And what 
is catch all ?
That implies is it a good idea to run a filter on each RDD first ? We do not do 
this using Pig on M/R. Is it required in Spark world ?
On Mon, Apr 13, 2015 at 9:58 PM, Jonathan Coveney  wrote:
My guess would be data skew. Do you know if there is some item id that is a 
catch all? can it be null? item id 0? lots of data sets have this sort of value 
and it always kills joins
2015-04-13 11:32 GMT-04:00 ÐΞ€ρ@Ҝ (๏̯͡๏) :
Code:
val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))] = 
lstgItem.join(viEvents).map {  case (itemId, (listing, viDetail)) =>
val viSummary = new VISummaryviSummary.leafCategoryId = 
listing.getLeafCategId().toIntviSummary.itemSiteId = 
listing.getItemSiteId().toIntviSummary.auctionTypeCode = 
listing.getAuctTypeCode().toIntviSummary.sellerCountryId = 
listing.getSlrCntryId().toIntviSummary.buyerSegment = "0"
viSummary.isBin = (if (listing.getBinPriceLstgCurncy.doubleValue() > 0) 1 else 
0)val sellerId = listing.getSlrId.toLong(sellerId, (viDetail, 
viSummary, itemId))}
Running Tasks:Tasks
  IndexIDAttemptStatus ▾Locality LevelExecutor ID / HostLaunch 
TimeDurationGC TimeShuffle Read Size / RecordsWrite TimeShuffle Write Size / 
RecordsShuffle Spill (Memory)Shuffle Spill (Disk)Errors
  

  
0
216
0
RUNNING
PROCESS_LOCAL
181 / phxaishdc9dn0474.phx.ebay.com
2015/04/13 06:43:53

  1.7 h




  13 min







 3.0 GB / 56964921
   

 
   
 0.0 B / 0
   

21.2 GB
  
1902.6 MB
  
 
  
2
218
0
SUCCESS
PROCESS_LOCAL
582 / phxaishdc9dn0235.phx.ebay.com
2015/04/13 06:43:53

  15 min




  31 s







 2.2 GB / 1666851
   

 0.1 s
   
 3.0 MB / 2062
   

54.8 GB
  
1924.5 MB
  
 
  
1
217
0
SUCCESS
PROCESS_LOCAL
202 / phxdpehdc9dn2683.stratus.phx.ebay.com
2015/04/13 06:43:53

  19 min




  1.3 min







 2.2 GB / 1687086
   

 75 ms
   
 3.9 MB / 2692
   

33.7 GB
  
1960.4 MB
  
 
  
4
220
0
SUCCESS
PROCESS_LOCAL
218 / phxaishdc9dn0855.phx.ebay.com
2015/04/13 06:43:53

  15 min




  56 s







 2.2 GB / 1675654
   

 40 ms
   
 3.3 MB / 2260
   

26.2 GB
  
1928.4 MB
  



Command:./bin/spark-submit -v --master yarn-cluster --driver-class-path 
/apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar
 --jars 
/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.3/spark_reporting_dep_only-1.0-SNAPSHOT.jar
  --num-executors 3000 --driver-memory 12g --driver-java-options 

Re: Need some guidance

2015-04-13 Thread Dean Wampler
The problem with using collect is that it will fail for large data sets, as
you'll attempt to copy the entire RDD to the memory of your driver program.
The following works (Scala syntax, but similar to Python):

scala> val i1 = input.distinct.groupByKey
scala> i1.foreach(println)
(1,CompactBuffer(beta, alpha, foo))
(3,CompactBuffer(foo))
(2,CompactBuffer(alpha, bar))

scala> val i2 = i1.map(tup => (tup._1, tup._2.size))
scala> i1.foreach(println)
(1,3)
(3,1)
(2,2)

The "i2" line passes a function that takes a tuple argument, then
constructs a new output tuple with the first element and the size of the
second (each CompactBuffer). An alternative pattern match syntax would be.

scala> val i2 = i1.map { case (key, buffer) => (key, buffer.size) }

This should work as long as none of the CompactBuffers are too large, which
could happen for extremely large data sets.

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
 (O'Reilly)
Typesafe 
@deanwampler 
http://polyglotprogramming.com

On Mon, Apr 13, 2015 at 11:45 AM, Marco Shaw  wrote:

> **Learning the ropes**
>
> I'm trying to grasp the concept of using the pipeline in pySpark...
>
> Simplified example:
> >>>
> list=[(1,"alpha"),(1,"beta"),(1,"foo"),(1,"alpha"),(2,"alpha"),(2,"alpha"),(2,"bar"),(3,"foo")]
>
> Desired outcome:
> [(1,3),(2,2),(3,1)]
>
> Basically for each key, I want the number of unique values.
>
> I've tried different approaches, but am I really using Spark effectively?
> I wondered if I would do something like:
> >>> input=sc.parallelize(list)
> >>> input.groupByKey().collect()
>
> Then I wondered if I could do something like a foreach over each key
> value, and then map the actual values and reduce them.  Pseudo-code:
>
> input.groupbykey()
> .keys
> .foreach(_.values
> .map(lambda x: x,1)
> .reducebykey(lambda a,b:a+b)
> .count()
> )
>
> I was somehow hoping that the key would get the current value of count,
> and thus be the count of the unique keys, which is exactly what I think I'm
> looking for.
>
> Am I way off base on how I could accomplish this?
>
> Marco
>


Re: Multiple Kafka Recievers

2015-04-13 Thread Cody Koeninger
As far as I know, createStream doesn't let you specify where receivers are
run.

createDirectStream in 1.3 doesn't use long-running receivers, so it is
likely to give you more even distribution of consumers across your workers.

On Mon, Apr 13, 2015 at 11:31 AM, Laeeq Ahmed 
wrote:

> Hi,
>
> I have 4 workers and I am trying to have parallel Kafka receivers, 1 on
> each worker, with the following code.
>
> val kafkaStreams = (0 to args.length - 1).map{ i =>
> KafkaUtils.createStream(ssc, zkQuorum, group, Map(args(i) ->
> 1),StorageLevel.MEMORY_ONLY).map(_._2) }
> val unifiedStream = ssc.union(kafkaStreams)
> unifiedStream.print()
>
> But I am getting receivers mostly on two workers (two on each), sometime
> on three workers. Whats wrong with the code??
>
> Regards,
> Laeeq
>
>
>
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>


Re: Equi Join is taking for ever. 1 Task is Running while other 199 are complete

2015-04-13 Thread Jonathan Coveney
I'm not 100% sure of spark's implementation but in the MR frameworks, it
would have a much larger shuffle write size becasue that node is dealing
with a lot more data and as a result has a lot  more to shuffle

2015-04-13 13:20 GMT-04:00 java8964 :

> If it is really due to data skew, will the task hanging has much bigger 
> Shuffle
> Write Size in this case?
>
> In this case, the shuffle write size for that task is 0, and the rest IO
> of this task is not much larger than the fast finished tasks, is that
> normal?
>
> I am also interested in this case, as from statistics on the UI, how it
> indicates the task could have skew data?
>
> Yong
>
> --
> Date: Mon, 13 Apr 2015 12:58:12 -0400
> Subject: Re: Equi Join is taking for ever. 1 Task is Running while other
> 199 are complete
> From: jcove...@gmail.com
> To: deepuj...@gmail.com
> CC: user@spark.apache.org
>
>
> I can promise you that this is also a problem in the pig world :) not sure
> why it's not a problem for this data set, though... are you sure that the
> two are doing the exact same code?
>
> you should inspect your source data. Make a histogram for each and see
> what the data distribution looks like. If there is a value or bucket with a
> disproportionate set of values you know you have an issue
>
> 2015-04-13 12:50 GMT-04:00 ÐΞ€ρ@Ҝ (๏̯͡๏) :
>
> You mean there is a tuple in either RDD, that has itemID = 0 or null ?
> And what is catch all ?
>
> That implies is it a good idea to run a filter on each RDD first ? We do
> not do this using Pig on M/R. Is it required in Spark world ?
>
> On Mon, Apr 13, 2015 at 9:58 PM, Jonathan Coveney 
> wrote:
>
> My guess would be data skew. Do you know if there is some item id that is
> a catch all? can it be null? item id 0? lots of data sets have this sort of
> value and it always kills joins
>
> 2015-04-13 11:32 GMT-04:00 ÐΞ€ρ@Ҝ (๏̯͡๏) :
>
> Code:
>
> val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary,
> Long))] = lstgItem.join(viEvents).map {
>   case (itemId, (listing, viDetail)) =>
> val viSummary = new VISummary
> viSummary.leafCategoryId = listing.getLeafCategId().toInt
> viSummary.itemSiteId = listing.getItemSiteId().toInt
> viSummary.auctionTypeCode = listing.getAuctTypeCode().toInt
> viSummary.sellerCountryId = listing.getSlrCntryId().toInt
> viSummary.buyerSegment = "0"
> viSummary.isBin = (if (listing.getBinPriceLstgCurncy.doubleValue()
> > 0) 1 else 0)
> val sellerId = listing.getSlrId.toLong
> (sellerId, (viDetail, viSummary, itemId))
> }
>
> Running Tasks:
> Tasks IndexIDAttemptStatus ▾Locality LevelExecutor ID / HostLaunch Time
> DurationGC TimeShuffle Read Size / RecordsWrite TimeShuffle Write Size /
> RecordsShuffle Spill (Memory)Shuffle Spill (Disk)Errors  0 216 0 RUNNING
> PROCESS_LOCAL 181 / phxaishdc9dn0474.phx.ebay.com 2015/04/13 06:43:53 1.7
> h  13 min  3.0 GB / 56964921  0.0 B / 0  21.2 GB 1902.6 MB   2 218 0
> SUCCESS PROCESS_LOCAL 582 / phxaishdc9dn0235.phx.ebay.com 2015/04/13
> 06:43:53 15 min  31 s  2.2 GB / 1666851  0.1 s 3.0 MB / 2062  54.8 GB 1924.5
> MB   1 217 0 SUCCESS PROCESS_LOCAL 202 /
> phxdpehdc9dn2683.stratus.phx.ebay.com 2015/04/13 06:43:53 19 min  1.3 min
> 2.2 GB / 1687086  75 ms 3.9 MB / 2692  33.7 GB 1960.4 MB   4 220 0 SUCCESS
> PROCESS_LOCAL 218 / phxaishdc9dn0855.phx.ebay.com 2015/04/13 06:43:53 15
> min  56 s  2.2 GB / 1675654  40 ms 3.3 MB / 2260  26.2 GB 1928.4 MB
>
>
>
> Command:
> ./bin/spark-submit -v --master yarn-cluster --driver-class-path
> /apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar
> --jars
> /apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.3/spark_reporting_dep_only-1.0-SNAPSHOT.jar
>  --num-executors 3000 --driver-memory 12g --driver-java-options
> "-XX:MaxPermSize=6G" --executor-memory 12g --executor-cores 1 --queue
> hdmi-express --class com.ebay.ep.poc.spark.reporting.SparkApp
> /home/dvasthimal/spark1.3/spark_reporting-1.0-SNAPSHOT.jar
> startDate=2015-04-6 endDate=2015-04-7
> input=/user/dvasthimal/epdatasets_small/exptsession subcommand=viewItem
> output=/user/dvasthimal/epdatasets/viewItem buffersize=128
> maxbuffersize=1068 maxResultSize=2G
>
>
> What do i do ? I killed the job twice and its stuck again. Where is it
> stuck ?
>
> --
> Deepak
>
>
>
>
>
> --
> Deepak
>
>
>


Re: Multipart upload to S3 fails with Bad Digest Exceptions

2015-04-13 Thread Eugen Cepoi
I think I found where the problem comes from.

I am writing lzo compressed thrift records using elephant-bird, my guess is
that perhaps one side is computing the checksum based on the uncompressed
data and the other on the compressed data, thus getting a mismatch.

When writing the data as strings using a plain TextOutputFormat, the multi
part upload works, this confirms that the lzo compression is probably the
problem... but it is not a solution :(

2015-04-13 18:46 GMT+02:00 Eugen Cepoi :

> Hi,
>
> I am not sure my problem is relevant to spark, but perhaps someone else
> had the same error. When I try to write files that need multipart upload to
> S3 from a job on EMR I always get this error:
>
> com.amazonaws.services.s3.model.AmazonS3Exception: The Content-MD5 you
> specified did not match what we received.
>
> If I disable multipart upload via fs.s3n.multipart.uploads.enabled (or
> output smaller files that don't require multi part upload), then everything
> works fine.
>
> I've seen an old thread on the ML where someone has the same error, but in
> my case I don't have any other errors on the worker nodes.
>
> I am using spark 1.2.1 and hadoop 2.4.0.
>
> Thanks,
> Eugen
>


Re: counters in spark

2015-04-13 Thread Grandl Robert
Guys,
Do you have any thoughts on this ?

Thanks,Robert 


 On Sunday, April 12, 2015 5:35 PM, Grandl Robert 
 wrote:
   

 Hi guys,
I was trying to figure out some counters in Spark, related to the amount of CPU 
or Memory used (in some metric), used by a task/stage/job, but I could not find 
any. 
Is there any such counter available ?
Thank you,Robert





  

Re: Spark 1.3.0: Running Pi example on YARN fails

2015-04-13 Thread Zhan Zhang
Hi Zork,

>From the exception, it is still caused by hdp.version not being propagated 
>correctly.  Can you check whether there is any typo?

[root@c6402 conf]# more java-opts -Dhdp.version=2.2.0.0–2041

[root@c6402 conf]# more spark-defaults.conf
spark.driver.extraJavaOptions  -Dhdp.version=2.2.0.0–2041
spark.yarn.am.extraJavaOptions  -Dhdp.version=2.2.0.0–2041

This is HDP specific question, and you can move the topic to HDP forum.


Thanks.

Zhan Zhang


On Apr 13, 2015, at 3:00 AM, Zork Sail 
mailto:zorks...@gmail.com>> wrote:

Hi Zhan,
Alas setting:

-Dhdp.version=2.2.0.0–2041

Does not help. Still get the same error:
15/04/13 09:53:59 INFO yarn.Client:
 client token: N/A
 diagnostics: N/A
 ApplicationMaster host: N/A
 ApplicationMaster RPC port: -1
 queue: default
 start time: 1428918838408
 final status: UNDEFINED
 tracking URL: 
http://foo.bar.site:8088/proxy/application_1427875242006_0037/
 user: test
15/04/13 09:54:00 INFO yarn.Client: Application report for 
application_1427875242006_0037 (state: ACCEPTED)
15/04/13 09:54:01 INFO yarn.Client: Application report for 
application_1427875242006_0037 (state: ACCEPTED)
15/04/13 09:54:02 INFO yarn.Client: Application report for 
application_1427875242006_0037 (state: ACCEPTED)
15/04/13 09:54:03 INFO yarn.Client: Application report for 
application_1427875242006_0037 (state: FAILED)
15/04/13 09:54:03 INFO yarn.Client:
 client token: N/A
 diagnostics: Application application_1427875242006_0037 failed 2 times due 
to AM Container for appattempt_1427875242006_0037_02 exited with  exitCode: 
1
For more detailed output, check application tracking 
page:http://foo.bar.site:8088/proxy/application_1427875242006_0037/Then, click 
on links to logs of each attempt.
Diagnostics: Exception from container-launch.
Container id: container_1427875242006_0037_02_01
Exit code: 1
Exception message: 
/mnt/hdfs01/hadoop/yarn/local/usercache/test/appcache/application_1427875242006_0037/container_1427875242006_0037_02_01/launch_container.sh:
 line 27: 
$PWD:$PWD/__spark__.jar:$HADOOP_CONF_DIR:/usr/hdp/current/hadoop-client/*:/usr/hdp/current/hadoop-client/lib/*:/usr/hdp/current/hadoop-hdfs-client/*:/usr/hdp/current/hadoop-hdfs-client/lib/*:/usr/hdp/current/hadoop-yarn-client/*:/usr/hdp/current/hadoop-yarn-client/lib/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/lib/*:$PWD/mr-framework/hadoop/share/hadoop/common/*:$PWD/mr-framework/hadoop/share/hadoop/common/lib/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/lib/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/lib/*:/usr/hdp/${hdp.version}/hadoop/lib/hadoop-lzo-0.6.0.${hdp.version}.jar:/etc/hadoop/conf/secure:
 bad substitution

Stack trace: ExitCodeException exitCode=1: 
/mnt/hdfs01/hadoop/yarn/local/usercache/test/appcache/application_1427875242006_0037/container_1427875242006_0037_02_01/launch_container.sh:
 line 27: 
$PWD:$PWD/__spark__.jar:$HADOOP_CONF_DIR:/usr/hdp/current/hadoop-client/*:/usr/hdp/current/hadoop-client/lib/*:/usr/hdp/current/hadoop-hdfs-client/*:/usr/hdp/current/hadoop-hdfs-client/lib/*:/usr/hdp/current/hadoop-yarn-client/*:/usr/hdp/current/hadoop-yarn-client/lib/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/lib/*:$PWD/mr-framework/hadoop/share/hadoop/common/*:$PWD/mr-framework/hadoop/share/hadoop/common/lib/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/lib/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/lib/*:/usr/hdp/${hdp.version}/hadoop/lib/hadoop-lzo-0.6.0.${hdp.version}.jar:/etc/hadoop/conf/secure:
 bad substitution

at org.apache.hadoop.util.Shell.runCommand(Shell.java:538)
at org.apache.hadoop.util.Shell.run(Shell.java:455)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)
at 
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:211)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)


Container exited with a non-zero exit code 1
Failing this attempt. Failing the application.
 ApplicationMaster host: N/A
 ApplicationMaster RPC port: -1
 queue: default
 start time: 1428918838408
 final status: FAILED
 tracking URL: 
http://foo.bar.site:8088/cluster/app/application_142787524

Re: Opening many Parquet files = slow

2015-04-13 Thread Eric Eijkelenboom
Hi guys

Does anyone know how to stop Spark from opening all Parquet files before 
starting a job? This is quite a show stopper for me, since I have 5000 Parquet 
files on S3.

Recap of what I tried: 

1. Disable schema merging with: sqlContext.load(“parquet", Map("mergeSchema" -> 
"false”, "path" -> “s3://path/to/folder"))
This opens most files in the folder (17 out of 21 in my small example). For 
5000 files on S3, sqlContext.load() takes 30 minutes to complete. 

2. Use the old api with: 
sqlContext.setConf("spark.sql.parquet.useDataSourceApi", "false”)
Now sqlContext.parquetFile() only opens a few files and prints the schema: 
so far so good! However, as soon as I run e.g. a count() on the dataframe, 
Spark still opens all files _before_ starting a job/stage. Effectively this 
moves the delay from load() to count() (or any other action I presume).

3. Run Spark 1.3.1-rc2.
sqlContext.load() took about 30 minutes for 5000 Parquet files on S3, the 
same as 1.3.0.

Any help would be greatly appreciated!

Thanks a lot. 

Eric




> On 10 Apr 2015, at 16:46, Eric Eijkelenboom  
> wrote:
> 
> Hi Ted
> 
> Ah, I guess the term ‘source’ confused me :)
> 
> Doing:
> 
> sqlContext.load(“parquet", Map("mergeSchema" -> "false”, "path" -> “path to a 
> single day of logs")) 
> 
> for 1 directory with 21 files, Spark opens 17 files: 
> 
> 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening 
> 's3n://mylogs/logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-72'
>  
> 
>  for reading
> 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening key 
> 'logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-72' for 
> reading at position '261573524'
> 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening 
> 's3n://mylogs/logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-74'
>  
> 
>  for reading
> 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening 
> 's3n://mylogs/logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-77'
>  
> 
>  for reading
> 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening 
> 's3n://mylogs/logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-62'
>  
> 
>  for reading
> 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening key 
> 'logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-74' for 
> reading at position '259256807'
> 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening key 
> 'logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-77' for 
> reading at position '260002042'
> 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening key 
> 'logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-62' for 
> reading at position ‘260875275'
> etc.
> 
> I can’t seem to pass a comma-separated list of directories to load(), so in 
> order to load multiple days of logs, I have to point to the root folder and 
> depend on auto-partition discovery (unless there’s a smarter way). 
> 
> Doing: 
> 
> sqlContext.load(“parquet", Map("mergeSchema" -> "false”, "path" -> “path to 
> root log dir")) 
> 
> starts opening what seems like all files (I killed the process after a couple 
> of minutes).
> 
> Thanks for helping out. 
> Eric



Re: feature scaling in GeneralizedLinearAlgorithm.scala

2015-04-13 Thread Xiangrui Meng
Correct. Prediction doesn't touch that code path. -Xiangrui

On Mon, Apr 13, 2015 at 9:58 AM, Jianguo Li 
wrote:

> Hi,
>
> In the GeneralizedLinearAlgorithm, which Logistic Regression relied on, it
> says "if userFeatureScaling is enabled, we will standardize the training
> features , and trained the model in the scaled space. Then we transform
> the coefficients from the scaled space to the original space ...".
>
> My understanding then is we do not need to scale the test data since the
> coefficients are already in the original space, is this correct?
>
> Thanks
>
> Jianguo
>
>
>
>


Re: Spark Cluster: RECEIVED SIGNAL 15: SIGTERM

2015-04-13 Thread Guillaume Pitel
That's why I think it's the OOM killer. There are several cases of 
memory overuse / errors :


1 - The application tries to allocate more than the Heap limit and GC 
cannot free more memory => OutOfMemory : Java Heap Space exception from JVM
2 - The jvm is configured with a max heap size larger than the available 
memory. At some point the application needs to allocate memory in JVM, 
the JVM tries to extend its heap and allocate real memory (or maybe the 
OS is configured with overcommit virtual memory), but fails => Kill 
process of sacrifice child (or others, depending on various factors : 
https://plumbr.eu/outofmemoryerror)
3 - The jvm has allocated its memory from the beginning and it has been 
served, but other processes start starving from memory shortage, the 
pressure on memory grows beyond the threshold configured in the OOM 
Killer, and boom, the java process is selected for a sacrifice because 
it is the main culprit of memory consumption.


Guillaume
Linux OOM throws SIGTERM, but if I remember correctly JVM handles heap 
memory limits differently and throws OutOfMemoryError and eventually 
sends SIGINT.


Not sure what happened but the worker simply received a SIGTERM 
signal, so perhaps the daemon was terminated by someone or a parent 
process. Just my guess.


Tim

On Mon, Apr 13, 2015 at 2:28 AM, Guillaume Pitel 
mailto:guillaume.pi...@exensa.com>> wrote:


Very likely to be this :

http://www.linuxdevcenter.com/pub/a/linux/2006/11/30/linux-out-of-memory.html?page=2

Your worker ran out of memory => maybe you're asking for too much
memory for the JVM, or something else is running on the worker

Guillaume

Any idea what this means, many thanks

==>
logs/spark-.-org.apache.spark.deploy.worker.Worker-1-09.out.1
<==
15/04/13 07:07:22 INFO Worker: Starting Spark worker 09:39910
with 4 cores, 6.6 GB RAM
15/04/13 07:07:22 INFO Worker: Running Spark version 1.3.0
15/04/13 07:07:22 INFO Worker: Spark home:
/remote/users//work/tools/spark-1.3.0-bin-hadoop2.4
15/04/13 07:07:22 INFO Server: jetty-8.y.z-SNAPSHOT
15/04/13 07:07:22 INFO AbstractConnector: Started
SelectChannelConnector@0.0.0.0:8081

15/04/13 07:07:22 INFO Utils: Successfully started service
'WorkerUI' on port 8081.
15/04/13 07:07:22 INFO WorkerWebUI: Started WorkerWebUI at
http://09:8081
15/04/13 07:07:22 INFO Worker: Connecting to master
akka.tcp://sparkMaster@nceuhamnr08:7077/user/Master...
15/04/13 07:07:22 INFO Worker: Successfully registered with
master spark://08:7077
*15/04/13 08:35:07 ERROR Worker: RECEIVED SIGNAL 15: SIGTERM*




-- 
eXenSa



*Guillaume PITEL, Président*
+33(0)626 222 431

eXenSa S.A.S. 
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)184 163 677 / Fax +33(0)972 283 705





--
eXenSa


*Guillaume PITEL, Président*
+33(0)626 222 431

eXenSa S.A.S. 
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)184 163 677 / Fax +33(0)972 283 705



Re: Spark TeraSort source request

2015-04-13 Thread Ewan Higgs

Tom,
According to Github's public activity log, Reynold Xin (in CC) deleted 
his sort-benchmark branch yesterday. I didn't have a local copy aside 
from the Daytona Partitioner (attached).


Reynold, is it possible to reinstate your branch?

-Ewan

On 13/04/15 16:41, Tom Hubregtsen wrote:
Thank you for your response Ewan. I quickly looked yesterday and it 
was there, but today at work I tried to open it again to start working 
on it, but it appears to be removed. Is this correct?


Thanks,

Tom

On 12 April 2015 at 06:58, Ewan Higgs > wrote:


Hi all.
The code is linked from my repo:

https://github.com/ehiggs/spark-terasort
"
This is an example Spark program for running TeraSort benchmarks.
It is based on work from Reynold Xin's branch
, but it is not the
same TeraSort program that currently holds the record
. That program is here

.
"

"That program is here" links to:

https://github.com/rxin/spark/tree/sort-benchmark/core/src/main/scala/org/apache/spark/sort

I've been working on other projects at the moment so I haven't
returned to the spark-terasort stuff. If you have any pull
requests, I would be very grateful.

Yours,
Ewan


On 08/04/15 03:26, Pramod Biligiri wrote:

+1. I would love to have the code for this as well.

Pramod

On Fri, Apr 3, 2015 at 12:47 PM, Tom mailto:thubregt...@gmail.com>> wrote:

Hi all,

As we all know, Spark has set the record for sorting data, as
published on:
https://databricks.com/blog/2014/10/10/spark-petabyte-sort.html.

Here at our group, we would love to verify these results, and
compare
machine using this benchmark. We've spend quite some time
trying to find the
terasort source code that was used, but can not find it anywhere.

We did find two candidates:

A version posted by Reynold [1], the posted of the message
above. This
version is stuck at "// TODO: Add partition-local
(external) sorting
using TeraSortRecordOrdering", only generating data.

Here, Ewan noticed that "it didn't appear to be similar to
Hadoop TeraSort."
[2] After this he created a version on his own [3]. With this
version, we
noticed problems with TeraValidate with datasets above ~10G
(as mentioned by
others at [4]. When examining the raw input and output files,
it actually
appears that the input data is sorted and the output data
unsorted in both
cases.

Because of this, we believe we did not yet find the actual
used source code.
I've tried to search in the Spark User forum archive's,
seeing request of
people, indicating a demand, but did not succeed in finding
the actual
source code.

My question:
Could you guys please make the source code of the used
TeraSort program,
preferably with settings, available? If not, what are the
reasons that this
seems to be withheld?

Thanks for any help,

Tom Hubregtsen

[1]

https://github.com/rxin/spark/commit/adcae69145905162fa3b6932f70be2c932f95f87
[2]

http://mail-archives.apache.org/mod_mbox/spark-dev/201411.mbox/%3c5462092c.1060...@ugent.be%3E
[3] https://github.com/ehiggs/spark-terasort
[4]

http://mail-archives.apache.org/mod_mbox/spark-user/201501.mbox/%3CCAPszQwgap4o1inZkTwcwV=7scwoqtr5yxfnsqo5p2kgp1bn...@mail.gmail.com%3E



--
View this message in context:

http://apache-spark-user-list.1001560.n3.nabble.com/Spark-TeraSort-source-request-tp22371.html
Sent from the Apache Spark User List mailing list archive at
Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org

For additional commands, e-mail: user-h...@spark.apache.org








/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES

Re: Configuring amount of disk space available to spark executors in mesos?

2015-04-13 Thread Patrick Wendell
Hey Jonathan,

Are you referring to disk space used for storing persisted RDD's? For
that, Spark does not bound the amount of data persisted to disk. It's
a similar story to how Spark's shuffle disk output works (and also
Hadoop and other frameworks make this assumption as well for their
shuffle data, AFAIK).

We could (in theory) add a storage level that bounds the amount of
data persisted to disk and forces re-computation if the partition did
not fit. I'd be interested to hear more about a workload where that's
relevant though, before going that route. Maybe if people are using
SSD's that would make sense.

- Patrick

On Mon, Apr 13, 2015 at 8:19 AM, Jonathan Coveney  wrote:
> I'm surprised that I haven't been able to find this via google, but I
> haven't...
>
> What is the setting that requests some amount of disk space for the
> executors? Maybe I'm misunderstanding how this is configured...
>
> Thanks for any help!

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Configuring amount of disk space available to spark executors in mesos?

2015-04-13 Thread Jonathan Coveney
Nothing so complicated... we are seeing mesos kill off our executors
immediately. When I reroute logging to an NFS directory we have available,
the executors survive fine. As such I am wondering if the spark workers are
getting killed by mesos for exceeding their disk quota (which atm is 0).
This could be a red herring, however.

2015-04-13 15:41 GMT-04:00 Patrick Wendell :

> Hey Jonathan,
>
> Are you referring to disk space used for storing persisted RDD's? For
> that, Spark does not bound the amount of data persisted to disk. It's
> a similar story to how Spark's shuffle disk output works (and also
> Hadoop and other frameworks make this assumption as well for their
> shuffle data, AFAIK).
>
> We could (in theory) add a storage level that bounds the amount of
> data persisted to disk and forces re-computation if the partition did
> not fit. I'd be interested to hear more about a workload where that's
> relevant though, before going that route. Maybe if people are using
> SSD's that would make sense.
>
> - Patrick
>
> On Mon, Apr 13, 2015 at 8:19 AM, Jonathan Coveney 
> wrote:
> > I'm surprised that I haven't been able to find this via google, but I
> > haven't...
> >
> > What is the setting that requests some amount of disk space for the
> > executors? Maybe I'm misunderstanding how this is configured...
> >
> > Thanks for any help!
>


How to load avro file into spark not on Hadoop in pyspark?

2015-04-13 Thread sa
I have 2 questions related to pyspark:

1. How do I load an avro file that is on the local filesystem as opposed to
hadoop? I tried the following and I just get NullPointerExceptions:

avro_rdd = sc.newAPIHadoopFile(
"file:///c:/my-file.avro",
"org.apache.avro.mapreduce.AvroKeyInputFormat",
"org.apache.avro.mapred.AvroKey",
"org.apache.hadoop.io.NullWritable",
   
keyConverter="org.apache.spark.examples.pythonconverters.AvroWrapperToJavaConverter",
conf=None)

2. If I have a stream of bytes with the avro "avrobytes", is there a way I
can create a spark context from it?

Let me know if either of the two above is possible, and if so, how.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-load-avro-file-into-spark-not-on-Hadoop-in-pyspark-tp22480.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Need some guidance

2015-04-13 Thread Victor Tso-Guillen
How about this?

input.distinct().combineByKey((v: V) => 1, (agg: Int, x: Int) => agg + 1,
(agg1: Int, agg2: Int) => agg1 + agg2).collect()

On Mon, Apr 13, 2015 at 10:31 AM, Dean Wampler 
wrote:

> The problem with using collect is that it will fail for large data sets,
> as you'll attempt to copy the entire RDD to the memory of your driver
> program. The following works (Scala syntax, but similar to Python):
>
> scala> val i1 = input.distinct.groupByKey
> scala> i1.foreach(println)
> (1,CompactBuffer(beta, alpha, foo))
> (3,CompactBuffer(foo))
> (2,CompactBuffer(alpha, bar))
>
> scala> val i2 = i1.map(tup => (tup._1, tup._2.size))
> scala> i1.foreach(println)
> (1,3)
> (3,1)
> (2,2)
>
> The "i2" line passes a function that takes a tuple argument, then
> constructs a new output tuple with the first element and the size of the
> second (each CompactBuffer). An alternative pattern match syntax would be.
>
> scala> val i2 = i1.map { case (key, buffer) => (key, buffer.size) }
>
> This should work as long as none of the CompactBuffers are too large,
> which could happen for extremely large data sets.
>
> dean
>
> Dean Wampler, Ph.D.
> Author: Programming Scala, 2nd Edition
>  (O'Reilly)
> Typesafe 
> @deanwampler 
> http://polyglotprogramming.com
>
> On Mon, Apr 13, 2015 at 11:45 AM, Marco Shaw  wrote:
>
>> **Learning the ropes**
>>
>> I'm trying to grasp the concept of using the pipeline in pySpark...
>>
>> Simplified example:
>> >>>
>> list=[(1,"alpha"),(1,"beta"),(1,"foo"),(1,"alpha"),(2,"alpha"),(2,"alpha"),(2,"bar"),(3,"foo")]
>>
>> Desired outcome:
>> [(1,3),(2,2),(3,1)]
>>
>> Basically for each key, I want the number of unique values.
>>
>> I've tried different approaches, but am I really using Spark
>> effectively?  I wondered if I would do something like:
>> >>> input=sc.parallelize(list)
>> >>> input.groupByKey().collect()
>>
>> Then I wondered if I could do something like a foreach over each key
>> value, and then map the actual values and reduce them.  Pseudo-code:
>>
>> input.groupbykey()
>> .keys
>> .foreach(_.values
>> .map(lambda x: x,1)
>> .reducebykey(lambda a,b:a+b)
>> .count()
>> )
>>
>> I was somehow hoping that the key would get the current value of count,
>> and thus be the count of the unique keys, which is exactly what I think I'm
>> looking for.
>>
>> Am I way off base on how I could accomplish this?
>>
>> Marco
>>
>
>


Re: Spark SQL Parquet as External table - 1.3.x HiveMetastoreType now hidden

2015-04-13 Thread Michael Armbrust
>
> Here is the stack trace. The first part shows the log when the session is
> started in Tableau. It is using the init sql option on the data
> connection to create theTEMPORARY table myNodeTable.
>

Ah, I see. thanks for providing the error.  The problem here is that
temporary tables do not exist in a database.  They are visible no matter
what the current database is.  Tableau is asking for
default.temporaryTable, which does not exist.


Re: Spark support for Hadoop Formats (Avro)

2015-04-13 Thread Michael Armbrust
The problem is likely that the underlying avro library is reusing objects
for speed.  You probably need to explicitly copy the values out of the
reused record before the collect.

On Sat, Apr 11, 2015 at 9:23 PM, ÐΞ€ρ@Ҝ (๏̯͡๏)  wrote:

> The read seem to be successfully as the values for each field in record
> are different and correct. The problem is when i collect it or trigger next
> processing (join with other table) , each of this probably triggers
> serialization and thats when all the fields in the record get the value of
> first field (or element).
>
>
>
> On Sun, Apr 12, 2015 at 9:14 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) 
> wrote:
>
>> We have very large processing being done on Hadoop (400 M/r Jobs, 1 Day
>> duration, 100s of TB data, 100s of joins). We are exploring Spark as
>> alternative to speed up our processing time. We use Scala + Scoobie today
>> and Avro is the data format across steps.
>>
>>
>> I observed a strange behavior, i read sample data (avro format, 10
>> records) and i collect it and print each record. All the data for each
>> element within a record is wiped out and i only see data of first element
>> being copied for everything.
>>
>> Is this a problem with Spark ? Or with using Avro ?
>>
>>
>> Example:
>>
>> I took that RDD run through it and printed 4 elements from it, they all
>> printed correctly.
>>
>>
>> val x = viEvents.map {
>>   case (itemId, event) =>
>> println(event.get("guid"), itemId, event.get("itemId"),
>> event.get("siteId"))
>> (itemId, event)
>> }
>>
>> The above code prints
>>
>> (27c9fbc014b4f61526f0574001b73b00,261197590161,261197590161,3)
>> (27c9fbc014b4f61526f0574001b73b00,261197590161,261197590161,3)
>> (27c9fbc014b4f61526f0574001b73b00,261197590161,261197590161,3)
>> (340da8c014a46272c0c8c830011c3bf0,221733319941,221733319941,77)
>> (340da8c014a46272c0c8c830011c3bf0,181704048554,181704048554,77)
>> (340da8c014a46272c0c8c830011c3bf0,231524481696,231524481696,77)
>> (340da8c014a46272c0c8c830011c3bf0,271830464992,271830464992,77)
>> (393938d71480a2aaf8e440d1fff709f4,141586046141,141586046141,0)
>> (3a792a7c14c0a35882346c04fff4e236,161605492016,161605492016,0)
>> (3a792a7c14c0a35882346c04fff4e236,161605492016,161605492016,0)
>>
>> viEvents.collect.foreach(a => println(a._2.get("guid"), a._1,
>> a._2.get("itemId"), a._2.get("siteId")))
>>
>> *Now, i collected it, this might have lead to serialization of the RDD.* Now
>> when i print the same 4 elements, *i only get guid values for all. Has
>> this got something to do with serialization ?*
>>
>>
>> (27c9fbc014b4f61526f0574001b73b00,261197590161,27c9fbc014b4f61526f0574001b73b00,27c9fbc014b4f61526f0574001b73b00)
>>
>> (27c9fbc014b4f61526f0574001b73b00,261197590161,27c9fbc014b4f61526f0574001b73b00,27c9fbc014b4f61526f0574001b73b00)
>>
>> (27c9fbc014b4f61526f0574001b73b00,261197590161,27c9fbc014b4f61526f0574001b73b00,27c9fbc014b4f61526f0574001b73b00)
>>
>> (340da8c014a46272c0c8c830011c3bf0,221733319941,340da8c014a46272c0c8c830011c3bf0,340da8c014a46272c0c8c830011c3bf0)
>>
>> (340da8c014a46272c0c8c830011c3bf0,181704048554,340da8c014a46272c0c8c830011c3bf0,340da8c014a46272c0c8c830011c3bf0)
>>
>> (340da8c014a46272c0c8c830011c3bf0,231524481696,340da8c014a46272c0c8c830011c3bf0,340da8c014a46272c0c8c830011c3bf0)
>>
>> (340da8c014a46272c0c8c830011c3bf0,271830464992,340da8c014a46272c0c8c830011c3bf0,340da8c014a46272c0c8c830011c3bf0)
>>
>> (393938d71480a2aaf8e440d1fff709f4,141586046141,393938d71480a2aaf8e440d1fff709f4,393938d71480a2aaf8e440d1fff709f4)
>>
>> (3a792a7c14c0a35882346c04fff4e236,161605492016,3a792a7c14c0a35882346c04fff4e236,3a792a7c14c0a35882346c04fff4e236)
>>
>> (3a792a7c14c0a35882346c04fff4e236,161605492016,3a792a7c14c0a35882346c04fff4e236,3a792a7c14c0a35882346c04fff4e236)
>>
>>
>>
>> The RDD is of type org.apache.spark.rdd.RDD[(Long,
>>  com.ebay.ep.poc.spark.reporting.process.detail.model.DetailInputRecord)]
>>
>> At the time of context creation i did this
>> val conf = new SparkConf()
>>   .setAppName(detail)
>>   .set("spark.serializer", "org.apache.spark.serializer.
>> *KryoSerializer*")
>>   .set("spark.kryoserializer.buffer.mb",
>> arguments.get("buffersize").get)
>>   .set("spark.kryoserializer.buffer.max.mb",
>> arguments.get("maxbuffersize").get)
>>   .set("spark.driver.maxResultSize",
>> arguments.get("maxResultSize").get)
>>   .set("spark.yarn.maxAppAttempts", "1")
>>
>> .registerKryoClasses(Array(classOf[com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLevelMetricSum],
>>
>> classOf[com.ebay.ep.poc.spark.reporting.process.detail.model.DetailInputRecord],
>>
>> classOf[com.ebay.ep.poc.spark.reporting.process.detail.model.InputRecord],
>>
>> classOf[com.ebay.ep.poc.spark.reporting.process.model.SessionRecord],
>>
>> classOf[com.ebay.ep.poc.spark.reporting.process.model.DataRecord],
>>
>> classOf[com.ebay.ep.poc.spark.reporting.process.model.ExperimentationRecord]))
>>
>> The class heirarchy is
>>
>> DetailInputRecord extends InputRecord extends SessionRecord extends
>

Re: How to load avro file into spark not on Hadoop in pyspark?

2015-04-13 Thread sa
Note that I am running pyspark in local mode (I do not have a hadoop cluster
connected) as I want to be able to work with the avro file outside of
hadoop.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-load-avro-file-into-spark-not-on-Hadoop-in-pyspark-tp22480p22481.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Need some guidance

2015-04-13 Thread Dean Wampler
That appears to work, with a few changes to get the types correct:

input.distinct().combineByKey((s: String) => 1, (agg: Int, s: String) =>
agg + 1, (agg1: Int, agg2: Int) => agg1 + agg2)

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
 (O'Reilly)
Typesafe 
@deanwampler 
http://polyglotprogramming.com

On Mon, Apr 13, 2015 at 3:24 PM, Victor Tso-Guillen  wrote:

> How about this?
>
> input.distinct().combineByKey((v: V) => 1, (agg: Int, x: Int) => agg + 1,
> (agg1: Int, agg2: Int) => agg1 + agg2).collect()
>
> On Mon, Apr 13, 2015 at 10:31 AM, Dean Wampler 
> wrote:
>
>> The problem with using collect is that it will fail for large data sets,
>> as you'll attempt to copy the entire RDD to the memory of your driver
>> program. The following works (Scala syntax, but similar to Python):
>>
>> scala> val i1 = input.distinct.groupByKey
>> scala> i1.foreach(println)
>> (1,CompactBuffer(beta, alpha, foo))
>> (3,CompactBuffer(foo))
>> (2,CompactBuffer(alpha, bar))
>>
>> scala> val i2 = i1.map(tup => (tup._1, tup._2.size))
>> scala> i1.foreach(println)
>> (1,3)
>> (3,1)
>> (2,2)
>>
>> The "i2" line passes a function that takes a tuple argument, then
>> constructs a new output tuple with the first element and the size of the
>> second (each CompactBuffer). An alternative pattern match syntax would be.
>>
>> scala> val i2 = i1.map { case (key, buffer) => (key, buffer.size) }
>>
>> This should work as long as none of the CompactBuffers are too large,
>> which could happen for extremely large data sets.
>>
>> dean
>>
>> Dean Wampler, Ph.D.
>> Author: Programming Scala, 2nd Edition
>>  (O'Reilly)
>> Typesafe 
>> @deanwampler 
>> http://polyglotprogramming.com
>>
>> On Mon, Apr 13, 2015 at 11:45 AM, Marco Shaw 
>> wrote:
>>
>>> **Learning the ropes**
>>>
>>> I'm trying to grasp the concept of using the pipeline in pySpark...
>>>
>>> Simplified example:
>>> >>>
>>> list=[(1,"alpha"),(1,"beta"),(1,"foo"),(1,"alpha"),(2,"alpha"),(2,"alpha"),(2,"bar"),(3,"foo")]
>>>
>>> Desired outcome:
>>> [(1,3),(2,2),(3,1)]
>>>
>>> Basically for each key, I want the number of unique values.
>>>
>>> I've tried different approaches, but am I really using Spark
>>> effectively?  I wondered if I would do something like:
>>> >>> input=sc.parallelize(list)
>>> >>> input.groupByKey().collect()
>>>
>>> Then I wondered if I could do something like a foreach over each key
>>> value, and then map the actual values and reduce them.  Pseudo-code:
>>>
>>> input.groupbykey()
>>> .keys
>>> .foreach(_.values
>>> .map(lambda x: x,1)
>>> .reducebykey(lambda a,b:a+b)
>>> .count()
>>> )
>>>
>>> I was somehow hoping that the key would get the current value of count,
>>> and thus be the count of the unique keys, which is exactly what I think I'm
>>> looking for.
>>>
>>> Am I way off base on how I could accomplish this?
>>>
>>> Marco
>>>
>>
>>
>


Rack locality

2015-04-13 Thread rcharaya
I want to use Rack locality feature of Apache Spark in my application.

Is YARN the only resource manager which supports it as of now?

After going through the source code, I found that default implementation of
getRackForHost() method returns NONE in TaskSchedulerImpl which (I suppose)
would be used by standalone mode.

On the other hand, it is overriden in YarnScheduler.scala to fetch the rack
information by invoking RackResolver api of hadoop which would be used when
its run on YARN.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Rack-locality-tp22483.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Rack locality

2015-04-13 Thread Sandy Ryza
Hi Riya,

As far as I know, that is correct, unless Mesos fine-grained mode handles
this in some mysterious way.

-Sandy

On Mon, Apr 13, 2015 at 2:09 PM, rcharaya  wrote:

> I want to use Rack locality feature of Apache Spark in my application.
>
> Is YARN the only resource manager which supports it as of now?
>
> After going through the source code, I found that default implementation of
> getRackForHost() method returns NONE in TaskSchedulerImpl which (I suppose)
> would be used by standalone mode.
>
> On the other hand, it is overriden in YarnScheduler.scala to fetch the rack
> information by invoking RackResolver api of hadoop which would be used when
> its run on YARN.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Rack-locality-tp22483.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: org.apache.spark.ml.recommendation.ALS

2015-04-13 Thread Jay Katukuri

Hi Xiangrui,

Here is the class:


object ALSNew {

 def main (args: Array[String]) {
 val conf = new SparkConf()
  .setAppName("TrainingDataPurchase")
  .set("spark.executor.memory", "4g")
  
  conf.set("spark.shuffle.memoryFraction","0.65") //default is 0.2  
conf.set("spark.storage.memoryFraction","0.3")//default is 0.6 


val sc = new SparkContext(conf) 
 val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._

 val pfile = args(0)
 val purchase=sc.textFile(pfile)
   

val ratings = purchase.map ( line =>
line.split(',') match { case Array(user, item, rate) =>
(user.toInt, item.toInt, rate.toFloat)
}).toDF()
  

val rank = args(1).toInt
val numIterations = args(2).toInt
val regParam : Double = 0.01
val implicitPrefs : Boolean = true
val numUserBlocks : Int = 100
val numItemBlocks : Int = 100
val nonnegative : Boolean = true

//val paramMap = ParamMap (regParam=0.01)
//paramMap.put(numUserBlocks=100,  numItemBlocks=100)
   val als = new ALS()
   .setRank(rank)
  .setRegParam(regParam)
  .setImplicitPrefs(implicitPrefs)
  .setNumUserBlocks(numUserBlocks)
  .setNumItemBlocks(numItemBlocks)
  
 
val alpha = als.getAlpha
  
   
  val model =  als.fit(ratings)
  
  
  val predictions = model.transform(ratings)
  .select("rating", "prediction")
  .map { case Row(rating: Float, prediction: Float) =>
(rating.toDouble, prediction.toDouble)
  }
val rmse =
  if (implicitPrefs) {
// TODO: Use a better (rank-based?) evaluation metric for implicit 
feedback.
// We limit the ratings and the predictions to interval [0, 1] and 
compute the weighted RMSE
// with the confidence scores as weights.
val (totalWeight, weightedSumSq) = predictions.map { case (rating, 
prediction) =>
  val confidence = 1.0 + alpha * math.abs(rating)
  val rating01 = math.max(math.min(rating, 1.0), 0.0)
  val prediction01 = math.max(math.min(prediction, 1.0), 0.0)
  val err = prediction01 - rating01
  (confidence, confidence * err * err)
}.reduce { case ((c0, e0), (c1, e1)) =>
  (c0 + c1, e0 + e1)
}
math.sqrt(weightedSumSq /totalWeight)
  } else {
val mse = predictions.map { case (rating, prediction) =>
  val err = rating - prediction
  err * err
}.mean()
math.sqrt(mse)
  }

println("Mean Squared Error = " + rmse)
 }
 
 
 
 }




I am using the following in my maven build (pom.xml): 




  org.scala-lang
  scala-library
  2.11.2


  org.apache.spark
  spark-core_2.11
  1.3.0



org.apache.spark
spark-mllib_2.11
1.3.0
   
   
   org.apache.spark
spark-sql_2.11
1.3.0
   
  


I am using scala version 2.11.2.

Could it be that "spark-1.3.0-bin-hadoop2.4.tgz requires  a different version 
of scala ?

Thanks,
Jay



On Apr 9, 2015, at 4:38 PM, Xiangrui Meng  wrote:

> Could you share ALSNew.scala? Which Scala version did you use? -Xiangrui
> 
> On Wed, Apr 8, 2015 at 4:09 PM, Jay Katukuri  wrote:
>> Hi Xiangrui,
>> 
>> I tried running this on my local machine  (laptop) and got the same error:
>> 
>> Here is what I did:
>> 
>> 1. downloaded spark 1.30 release version (prebuilt for hadoop 2.4 and later)
>> "spark-1.3.0-bin-hadoop2.4.tgz".
>> 2. Ran the following command:
>> 
>> spark-submit --class ALSNew  --master local[8] ALSNew.jar  /input_path
>> 
>> 
>> The stack trace is exactly same.
>> 
>> Thanks,
>> Jay
>> 
>> 
>> 
>> On Apr 8, 2015, at 10:47 AM, Jay Katukuri  wrote:
>> 
>> some additional context:
>> 
>> Since, I am using features of spark 1.3.0, I have downloaded spark 1.3.0 and
>> used spark-submit from there.
>> The cluster is still on spark-1.2.0.
>> 
>> So, this looks to me that at runtime, the executors could not find some
>> libraries of spark-1.3.0, even though I ran spark-submit from my downloaded
>> spark-1.30.
>> 
>> 
>> 
>> On Apr 6, 2015, at 1:37 PM, Jay Katukuri  wrote:
>> 
>> Here is the command that I have used :
>> 
>> spark-submit —class packagename.ALSNew --num-executors 100 --master yarn
>> ALSNew.jar -jar spark-sql_2.11-1.3.0.jar hdfs://input_path
>> 
>> Btw - I could run the old ALS in mllib package.
>> 
>> 
>> 
>> 
>> 
>> On Apr 6, 2015, at 12:32 PM, Xiangrui Meng  wrote:
>> 
>> So ALSNew.scala is your own application, did you add it with
>> spark-submit or spark-shell? The correct command should like
>> 
>> spark-submit --class your.package.name.ALSNew ALSNew.jar [options]
>> 
>> Please check the documentation:
>> http://spark.apache.org/docs/latest/submitting-applications.html
>> 
>> -Xiangrui
>> 
>> On Mon, Apr 6, 2015 at 12:27 PM, Jay Katukuri  wrote:
>> 
>> Hi,
>> 
>> Here is the stack trace:
>> 
>> 
>> Exception in 

Registering classes with KryoSerializer

2015-04-13 Thread Arun Lists
Hi,

I am trying to register classes with KryoSerializer. This has worked with
other programs. Usually the error messages are helpful in indicating which
classes need to be registered. But with my current program, I get the
following cryptic error message:

*Caused by: java.lang.IllegalArgumentException: Class is not registered:
scala.reflect.ClassTag$$anon$1*

*Note: To register this class use:
kryo.register(scala.reflect.ClassTag$$anon$1.class);*

How do I find out which class needs to be registered? I looked at my
program and registered all classes used in RDDs. But clearly more classes
remain to be registered if I can figure out which classes.

Thanks for your help!

arun


Spark Worker IP Error

2015-04-13 Thread DStrip
I tried to start the Spark Worker using the registered IP but this error
occurred:

15/04/13 21:35:59 INFO Worker: Registered signal handlers for [TERM, HUP,
INT]
Exception in thread "main" java.net.UnknownHostException: 10.240.92.75/:
Name or service not known
at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method)
at java.net.InetAddress$1.lookupAllHostAddr(InetAddress.java:901)
at 
java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1293)
at java.net.InetAddress.getAllByName0(InetAddress.java:1246)
at java.net.InetAddress.getAllByName(InetAddress.java:1162)
at java.net.InetAddress.getAllByName(InetAddress.java:1098)
at java.net.InetAddress.getByName(InetAddress.java:1048)
at org.apache.spark.util.Utils$.getAddressHostName(Utils.scala:819)
at
org.apache.spark.util.Utils$.localIpAddressHostname$lzycompute(Utils.scala:763)
at org.apache.spark.util.Utils$.localIpAddressHostname(Utils.scala:763)
at
org.apache.spark.util.Utils$$anonfun$localHostName$1.apply(Utils.scala:815)
at
org.apache.spark.util.Utils$$anonfun$localHostName$1.apply(Utils.scala:815)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.util.Utils$.localHostName(Utils.scala:815)
at
org.apache.spark.deploy.worker.WorkerArguments.(WorkerArguments.scala:29)
at org.apache.spark.deploy.worker.Worker$.main(Worker.scala:528)
at org.apache.spark.deploy.worker.Worker.main(Worker.scala)

My concern is that I have deployed the Spark Master using the same process
(assigning the corresponding IP) and everything works fine. I have also
looked at the /etc/hosts file and the 10.240.92.75 is up and working fine.
Moreover I have changed the ipv6 to ipv4 using _JAVA_OPTIONS:
-Djava.net.preferIPv4Stack=true but still the error was keep coming up. I am
a little bit confused; I would be grateful for any suggestions/help.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Worker-IP-Error-tp22484.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to access postgresql on Spark SQL

2015-04-13 Thread doovsaid
Hi all,
Who know how to access postgresql on Spark SQL? Do I need add the postgresql 
dependency in build.sbt and set class path for it? 
Thanks.
RegardsYi

sbt-assembly spark-streaming-kinesis-asl error

2015-04-13 Thread Mike Trienis
Hi All,

I have having trouble building a fat jar file through sbt-assembly.

[warn] Merging 'META-INF/NOTICE.txt' with strategy 'rename'
[warn] Merging 'META-INF/NOTICE' with strategy 'rename'
[warn] Merging 'META-INF/LICENSE.txt' with strategy 'rename'
[warn] Merging 'META-INF/LICENSE' with strategy 'rename'
[warn] Merging 'META-INF/MANIFEST.MF' with strategy 'discard'
[warn] Merging
'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.properties' with
strategy 'discard'
[warn] Merging
'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.xml' with strategy
'discard'
[warn] Merging 'META-INF/maven/commons-dbcp/commons-dbcp/pom.properties'
with strategy 'discard'
[warn] Merging 'META-INF/maven/commons-dbcp/commons-dbcp/pom.xml' with
strategy 'discard'
[warn] Merging 'META-INF/maven/commons-pool/commons-pool/pom.properties'
with strategy 'discard'
[warn] Merging 'META-INF/maven/commons-pool/commons-pool/pom.xml' with
strategy 'discard'
[warn] Merging 'META-INF/maven/joda-time/joda-time/pom.properties' with
strategy 'discard'
[warn] Merging 'META-INF/maven/joda-time/joda-time/pom.xml' with strategy
'discard'
[warn] Merging 'META-INF/maven/log4j/log4j/pom.properties' with strategy
'discard'
[warn] Merging 'META-INF/maven/log4j/log4j/pom.xml' with strategy 'discard'
[warn] Merging 'META-INF/maven/org.joda/joda-convert/pom.properties' with
strategy 'discard'
[warn] Merging 'META-INF/maven/org.joda/joda-convert/pom.xml' with strategy
'discard'
[warn] Merging 'META-INF/maven/org.slf4j/slf4j-api/pom.properties' with
strategy 'discard'
[warn] Merging 'META-INF/maven/org.slf4j/slf4j-api/pom.xml' with strategy
'discard'
[warn] Merging 'META-INF/maven/org.slf4j/slf4j-log4j12/pom.properties' with
strategy 'discard'
[warn] Merging 'META-INF/maven/org.slf4j/slf4j-log4j12/pom.xml' with
strategy 'discard'
[warn] Merging 'META-INF/services/java.sql.Driver' with strategy
'filterDistinctLines'
[warn] Merging 'rootdoc.txt' with strategy 'concat'
[warn] Strategy 'concat' was applied to a file
[warn] Strategy 'discard' was applied to 17 files
[warn] Strategy 'filterDistinctLines' was applied to a file
[warn] Strategy 'rename' was applied to 4 files

When submitting the spark application through the command

sh /usr/lib/spark/bin/spark-submit -class com.xxx.ExampleClassName
target/scala-2.10/-snapshot.jar

I end up the the following error,

Exception in thread "main" java.lang.NoClassDefFoundError:
org/joda/time/format/DateTimeFormat
at com.amazonaws.auth.AWS4Signer.(AWS4Signer.java:44)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at java.lang.Class.newInstance(Class.java:379)
at com.amazonaws.auth.SignerFactory.createSigner(SignerFactory.java:119)
at
com.amazonaws.auth.SignerFactory.lookupAndCreateSigner(SignerFactory.java:105)
at com.amazonaws.auth.SignerFactory.getSigner(SignerFactory.java:78)
at
com.amazonaws.AmazonWebServiceClient.computeSignerByServiceRegion(AmazonWebServiceClient.java:307)
at
com.amazonaws.AmazonWebServiceClient.computeSignerByURI(AmazonWebServiceClient.java:280)
at
com.amazonaws.AmazonWebServiceClient.setEndpoint(AmazonWebServiceClient.java:160)
at
com.amazonaws.services.kinesis.AmazonKinesisClient.setEndpoint(AmazonKinesisClient.java:2102)
at
com.amazonaws.services.kinesis.AmazonKinesisClient.init(AmazonKinesisClient.java:216)
at
com.amazonaws.services.kinesis.AmazonKinesisClient.(AmazonKinesisClient.java:202)
at
com.amazonaws.services.kinesis.AmazonKinesisClient.(AmazonKinesisClient.java:175)
at
com.amazonaws.services.kinesis.AmazonKinesisClient.(AmazonKinesisClient.java:155)
at com.quickstatsengine.aws.AwsProvider$.(AwsProvider.scala:20)
at com.quickstatsengine.aws.AwsProvider$.(AwsProvider.scala)

The snippet from my build.sbt file is:

"org.apache.spark" %% "spark-core" % "1.2.0" % "provided",
"org.apache.spark" %% "spark-streaming" % "1.2.0" % "provided",
"com.datastax.spark" %% "spark-cassandra-connector" %
"1.2.0-alpha1" % "provided",
"org.apache.spark" %% "spark-streaming-kinesis-asl" % "1.2.0" %
"provided",

And the error is originating from:

val kinesisClient = new AmazonKinesisClient(new
DefaultAWSCredentialsProviderChain())

Am I correct to set spark-streaming-kinesis-asl as a *provided *dependency?
Also, is there a merge strategy I need apply?

Any help would be appreciated, Mike.


Re: sbt-assembly spark-streaming-kinesis-asl error

2015-04-13 Thread Vadim Bichutskiy
I don't believe the Kinesis asl should be provided. I used mergeStrategy 
successfully to produce an "uber jar."

Fyi, I've been having trouble consuming data out of Kinesis with Spark with no 
success :( 
Would be curious to know if you got it working.

Vadim

> On Apr 13, 2015, at 9:36 PM, Mike Trienis  wrote:
> 
> Hi All,
> 
> I have having trouble building a fat jar file through sbt-assembly. 
> 
> [warn] Merging 'META-INF/NOTICE.txt' with strategy 'rename'
> [warn] Merging 'META-INF/NOTICE' with strategy 'rename'
> [warn] Merging 'META-INF/LICENSE.txt' with strategy 'rename'
> [warn] Merging 'META-INF/LICENSE' with strategy 'rename'
> [warn] Merging 'META-INF/MANIFEST.MF' with strategy 'discard'
> [warn] Merging 
> 'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.properties' with 
> strategy 'discard'
> [warn] Merging 'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.xml' 
> with strategy 'discard'
> [warn] Merging 'META-INF/maven/commons-dbcp/commons-dbcp/pom.properties' with 
> strategy 'discard'
> [warn] Merging 'META-INF/maven/commons-dbcp/commons-dbcp/pom.xml' with 
> strategy 'discard'
> [warn] Merging 'META-INF/maven/commons-pool/commons-pool/pom.properties' with 
> strategy 'discard'
> [warn] Merging 'META-INF/maven/commons-pool/commons-pool/pom.xml' with 
> strategy 'discard'
> [warn] Merging 'META-INF/maven/joda-time/joda-time/pom.properties' with 
> strategy 'discard'
> [warn] Merging 'META-INF/maven/joda-time/joda-time/pom.xml' with strategy 
> 'discard'
> [warn] Merging 'META-INF/maven/log4j/log4j/pom.properties' with strategy 
> 'discard'
> [warn] Merging 'META-INF/maven/log4j/log4j/pom.xml' with strategy 'discard'
> [warn] Merging 'META-INF/maven/org.joda/joda-convert/pom.properties' with 
> strategy 'discard'
> [warn] Merging 'META-INF/maven/org.joda/joda-convert/pom.xml' with strategy 
> 'discard'
> [warn] Merging 'META-INF/maven/org.slf4j/slf4j-api/pom.properties' with 
> strategy 'discard'
> [warn] Merging 'META-INF/maven/org.slf4j/slf4j-api/pom.xml' with strategy 
> 'discard'
> [warn] Merging 'META-INF/maven/org.slf4j/slf4j-log4j12/pom.properties' with 
> strategy 'discard'
> [warn] Merging 'META-INF/maven/org.slf4j/slf4j-log4j12/pom.xml' with strategy 
> 'discard'
> [warn] Merging 'META-INF/services/java.sql.Driver' with strategy 
> 'filterDistinctLines'
> [warn] Merging 'rootdoc.txt' with strategy 'concat'
> [warn] Strategy 'concat' was applied to a file
> [warn] Strategy 'discard' was applied to 17 files
> [warn] Strategy 'filterDistinctLines' was applied to a file
> [warn] Strategy 'rename' was applied to 4 files
> 
> When submitting the spark application through the command
> 
> sh /usr/lib/spark/bin/spark-submit -class com.xxx.ExampleClassName 
> target/scala-2.10/-snapshot.jar
> 
> I end up the the following error, 
> 
> Exception in thread "main" java.lang.NoClassDefFoundError: 
> org/joda/time/format/DateTimeFormat
>   at com.amazonaws.auth.AWS4Signer.(AWS4Signer.java:44)
>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
>   at java.lang.Class.newInstance(Class.java:379)
>   at com.amazonaws.auth.SignerFactory.createSigner(SignerFactory.java:119)
>   at 
> com.amazonaws.auth.SignerFactory.lookupAndCreateSigner(SignerFactory.java:105)
>   at com.amazonaws.auth.SignerFactory.getSigner(SignerFactory.java:78)
>   at 
> com.amazonaws.AmazonWebServiceClient.computeSignerByServiceRegion(AmazonWebServiceClient.java:307)
>   at 
> com.amazonaws.AmazonWebServiceClient.computeSignerByURI(AmazonWebServiceClient.java:280)
>   at 
> com.amazonaws.AmazonWebServiceClient.setEndpoint(AmazonWebServiceClient.java:160)
>   at 
> com.amazonaws.services.kinesis.AmazonKinesisClient.setEndpoint(AmazonKinesisClient.java:2102)
>   at 
> com.amazonaws.services.kinesis.AmazonKinesisClient.init(AmazonKinesisClient.java:216)
>   at 
> com.amazonaws.services.kinesis.AmazonKinesisClient.(AmazonKinesisClient.java:202)
>   at 
> com.amazonaws.services.kinesis.AmazonKinesisClient.(AmazonKinesisClient.java:175)
>   at 
> com.amazonaws.services.kinesis.AmazonKinesisClient.(AmazonKinesisClient.java:155)
>   at com.quickstatsengine.aws.AwsProvider$.(AwsProvider.scala:20)
>   at com.quickstatsengine.aws.AwsProvider$.(AwsProvider.scala)
> 
> The snippet from my build.sbt file is: 
> 
> "org.apache.spark" %% "spark-core" % "1.2.0" % "provided",
> "org.apache.spark" %% "spark-streaming" % "1.2.0" % "provided",
> "com.datastax.spark" %% "spark-cassandra-connector" % "1.2.0-alpha1" 
> % "provided",
> "org.apache.spark" %% "spark-streaming-kinesis-asl" % "1.2.0" % 
> "provided",
> 
> And

Re: Task result in Spark Worker Node

2015-04-13 Thread Imran Rashid
On the worker side, it all happens in Executor.  The task result is
computed here:

https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/executor/Executor.scala#L210

then its serialized along with some other goodies, and finally sent back to
the driver here:

https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/executor/Executor.scala#L255

What happens on the driver is quite a bit more complicated, and involves a
number of spots in the code, but at least to get you started, the results
are received here:

https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L328

though perhaps a more interesting spot is where they are handled in
DagScheduler#handleTaskCompletion:
https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1001


also, I think I know what you mean, but just to make sure: I wouldn't say
the results from the worker are "broadcast" back to the driver.  (a) in
spark, "broadcast" tends to refer to a particular api for sharing immutable
data from the driver to the workers (only one direction) and (b) it doesn't
really fit a more general meaning of "broadcast" either, since the results
are sent only to the driver, not to all nodes.

On Sun, Mar 29, 2015 at 8:34 PM, raggy  wrote:

> I am a PhD student working on a research project related to Apache Spark. I
> am trying to modify some of the spark source code such that instead of
> sending the final result RDD from the worker nodes to a master node, I want
> to send the final result RDDs to some different node. In order to do this,
> I
> have been trying to identify at which point the Spark worker nodes
> broadcast
> the results of a job back to the master.
>
> So far, I understand that in Spark, the master serializes the RDD and the
> functions to be applied on them and sends them over to the worker nodes. In
> the context of reduce, it serializes the RDD partition and the reduce
> function and sends them to the worker nodes. However, my understanding of
> how things happen at the worker node is very limited and I would appreciate
> it if someone could help me identify where the process of broadcasting the
> results of local worker computations back to the master node takes place.
>
> This is some of the limited knowledge that I have about the worker nodes:
>
> Each job gets divided into smaller sets of tasks called stages. Each Stage
> is either a Shuffle Map Stage or Result Stage. In a Shuffle Map Stage, the
> task results are used as input for another stage. The result stage uses the
> RDD to compute the action that initiated the job. So, this result stage
> executes the last task for the job on the worker node. I would assume after
> this is done, it gets the result and broadcasts it to the driver
> application(the master).
>
> In ResultTask.scala(spark-core src/main/scala org.apache.spark.scheduler)
> it
> states "A task that sends back the output to the driver application.".
> However, I don't see when or where this happens in the source code. I would
> very much appreciate it if someone could help me identify where this
> happens
> in the Spark source code.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Task-result-in-Spark-Worker-Node-tp22283.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: sbt-assembly spark-streaming-kinesis-asl error

2015-04-13 Thread Mike Trienis
Thanks Vadim, I can certainly consume data from a Kinesis stream when
running locally. I'm currently in the processes of extending my work to a
proper cluster (i.e. using a spark-submit job via uber jar). Feel free to
add me to gmail chat and maybe we can help each other.

On Mon, Apr 13, 2015 at 6:46 PM, Vadim Bichutskiy <
vadim.bichuts...@gmail.com> wrote:

> I don't believe the Kinesis asl should be provided. I used mergeStrategy
> successfully to produce an "uber jar."
>
> Fyi, I've been having trouble consuming data out of Kinesis with Spark
> with no success :(
> Would be curious to know if you got it working.
>
> Vadim
>
> On Apr 13, 2015, at 9:36 PM, Mike Trienis  wrote:
>
> Hi All,
>
> I have having trouble building a fat jar file through sbt-assembly.
>
> [warn] Merging 'META-INF/NOTICE.txt' with strategy 'rename'
> [warn] Merging 'META-INF/NOTICE' with strategy 'rename'
> [warn] Merging 'META-INF/LICENSE.txt' with strategy 'rename'
> [warn] Merging 'META-INF/LICENSE' with strategy 'rename'
> [warn] Merging 'META-INF/MANIFEST.MF' with strategy 'discard'
> [warn] Merging
> 'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.properties' with
> strategy 'discard'
> [warn] Merging
> 'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.xml' with strategy
> 'discard'
> [warn] Merging 'META-INF/maven/commons-dbcp/commons-dbcp/pom.properties'
> with strategy 'discard'
> [warn] Merging 'META-INF/maven/commons-dbcp/commons-dbcp/pom.xml' with
> strategy 'discard'
> [warn] Merging 'META-INF/maven/commons-pool/commons-pool/pom.properties'
> with strategy 'discard'
> [warn] Merging 'META-INF/maven/commons-pool/commons-pool/pom.xml' with
> strategy 'discard'
> [warn] Merging 'META-INF/maven/joda-time/joda-time/pom.properties' with
> strategy 'discard'
> [warn] Merging 'META-INF/maven/joda-time/joda-time/pom.xml' with strategy
> 'discard'
> [warn] Merging 'META-INF/maven/log4j/log4j/pom.properties' with strategy
> 'discard'
> [warn] Merging 'META-INF/maven/log4j/log4j/pom.xml' with strategy 'discard'
> [warn] Merging 'META-INF/maven/org.joda/joda-convert/pom.properties' with
> strategy 'discard'
> [warn] Merging 'META-INF/maven/org.joda/joda-convert/pom.xml' with
> strategy 'discard'
> [warn] Merging 'META-INF/maven/org.slf4j/slf4j-api/pom.properties' with
> strategy 'discard'
> [warn] Merging 'META-INF/maven/org.slf4j/slf4j-api/pom.xml' with strategy
> 'discard'
> [warn] Merging 'META-INF/maven/org.slf4j/slf4j-log4j12/pom.properties'
> with strategy 'discard'
> [warn] Merging 'META-INF/maven/org.slf4j/slf4j-log4j12/pom.xml' with
> strategy 'discard'
> [warn] Merging 'META-INF/services/java.sql.Driver' with strategy
> 'filterDistinctLines'
> [warn] Merging 'rootdoc.txt' with strategy 'concat'
> [warn] Strategy 'concat' was applied to a file
> [warn] Strategy 'discard' was applied to 17 files
> [warn] Strategy 'filterDistinctLines' was applied to a file
> [warn] Strategy 'rename' was applied to 4 files
>
> When submitting the spark application through the command
>
> sh /usr/lib/spark/bin/spark-submit -class com.xxx.ExampleClassName
> target/scala-2.10/-snapshot.jar
>
> I end up the the following error,
>
> Exception in thread "main" java.lang.NoClassDefFoundError:
> org/joda/time/format/DateTimeFormat
> at com.amazonaws.auth.AWS4Signer.(AWS4Signer.java:44)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> at java.lang.Class.newInstance(Class.java:379)
> at com.amazonaws.auth.SignerFactory.createSigner(SignerFactory.java:119)
> at
> com.amazonaws.auth.SignerFactory.lookupAndCreateSigner(SignerFactory.java:105)
> at com.amazonaws.auth.SignerFactory.getSigner(SignerFactory.java:78)
> at
> com.amazonaws.AmazonWebServiceClient.computeSignerByServiceRegion(AmazonWebServiceClient.java:307)
> at
> com.amazonaws.AmazonWebServiceClient.computeSignerByURI(AmazonWebServiceClient.java:280)
> at
> com.amazonaws.AmazonWebServiceClient.setEndpoint(AmazonWebServiceClient.java:160)
> at
> com.amazonaws.services.kinesis.AmazonKinesisClient.setEndpoint(AmazonKinesisClient.java:2102)
> at
> com.amazonaws.services.kinesis.AmazonKinesisClient.init(AmazonKinesisClient.java:216)
> at
> com.amazonaws.services.kinesis.AmazonKinesisClient.(AmazonKinesisClient.java:202)
> at
> com.amazonaws.services.kinesis.AmazonKinesisClient.(AmazonKinesisClient.java:175)
> at
> com.amazonaws.services.kinesis.AmazonKinesisClient.(AmazonKinesisClient.java:155)
> at com.quickstatsengine.aws.AwsProvider$.(AwsProvider.scala:20)
> at com.quickstatsengine.aws.AwsProvider$.(AwsProvider.scala)
>
> The snippet from my build.sbt file is:
>
> "org.apache.spark" %% "spark-core" % "1.2.0" % "provided",
> "org.apache.spark" %% "spark-streamin

Re: How to get rdd count() without double evaluation of the RDD?

2015-04-13 Thread Imran Rashid
yes, it sounds like a good use of an accumulator to me

val counts = sc.accumulator(0L)
rdd.map{x =>
  counts += 1
  x
}.saveAsObjectFile(file2)


On Mon, Mar 30, 2015 at 12:08 PM, Wang, Ningjun (LNG-NPV) <
ningjun.w...@lexisnexis.com> wrote:

>  Sean
>
>
>
> Yes I know that I can use persist() to persist to disk, but it is still a
> big extra cost of persist a huge RDD to disk. I hope that I can do one pass
> to get the count as well as rdd.saveAsObjectFile(file2), but I don’t know
> how.
>
>
>
> May be use accumulator to count the total ?
>
>
>
> Ningjun
>
>
>
> *From:* Mark Hamstra [mailto:m...@clearstorydata.com]
> *Sent:* Thursday, March 26, 2015 12:37 PM
> *To:* Sean Owen
> *Cc:* Wang, Ningjun (LNG-NPV); user@spark.apache.org
> *Subject:* Re: How to get rdd count() without double evaluation of the
> RDD?
>
>
>
> You can also always take the more extreme approach of using
> SparkContext#runJob (or submitJob) to write a custom Action that does what
> you want in one pass.  Usually that's not worth the extra effort.
>
>
>
> On Thu, Mar 26, 2015 at 9:27 AM, Sean Owen  wrote:
>
> To avoid computing twice you need to persist the RDD but that need not be
> in memory. You can persist to disk with persist().
>
> On Mar 26, 2015 4:11 PM, "Wang, Ningjun (LNG-NPV)" <
> ningjun.w...@lexisnexis.com> wrote:
>
> I have a rdd that is expensive to compute. I want to save it as object
> file and also print the count. How can I avoid double computation of the
> RDD?
>
>
>
> val rdd = sc.textFile(someFile).map(line => expensiveCalculation(line))
>
>
>
> val count = rdd.count()  // this force computation of the rdd
>
> println(count)
>
> rdd.saveAsObjectFile(file2) // this compute the RDD again
>
>
>
> I can avoid double computation by using cache
>
>
>
> val rdd = sc.textFile(someFile).map(line => expensiveCalculation(line))
>
> rdd.cache()
>
> val count = rdd.count()
>
> println(count)
>
> rdd.saveAsObjectFile(file2) // this compute the RDD again
>
>
>
> This only compute rdd once. However the rdd has millions of items and will
> cause out of memory.
>
>
>
> Question: how can I avoid double computation without using cache?
>
>
>
>
>
> Ningjun
>
>
>


Re: sbt-assembly spark-streaming-kinesis-asl error

2015-04-13 Thread Vadim Bichutskiy
Thanks Mike. I was having trouble on EC2.

> On Apr 13, 2015, at 10:25 PM, Mike Trienis  wrote:
> 
> Thanks Vadim, I can certainly consume data from a Kinesis stream when running 
> locally. I'm currently in the processes of extending my work to a proper 
> cluster (i.e. using a spark-submit job via uber jar). Feel free to add me to 
> gmail chat and maybe we can help each other. 
> 
>> On Mon, Apr 13, 2015 at 6:46 PM, Vadim Bichutskiy 
>>  wrote:
>> I don't believe the Kinesis asl should be provided. I used mergeStrategy 
>> successfully to produce an "uber jar."
>> 
>> Fyi, I've been having trouble consuming data out of Kinesis with Spark with 
>> no success :( 
>> Would be curious to know if you got it working.
>> 
>> Vadim
>> 
>>> On Apr 13, 2015, at 9:36 PM, Mike Trienis  wrote:
>>> 
>>> Hi All,
>>> 
>>> I have having trouble building a fat jar file through sbt-assembly. 
>>> 
>>> [warn] Merging 'META-INF/NOTICE.txt' with strategy 'rename'
>>> [warn] Merging 'META-INF/NOTICE' with strategy 'rename'
>>> [warn] Merging 'META-INF/LICENSE.txt' with strategy 'rename'
>>> [warn] Merging 'META-INF/LICENSE' with strategy 'rename'
>>> [warn] Merging 'META-INF/MANIFEST.MF' with strategy 'discard'
>>> [warn] Merging 
>>> 'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.properties' with 
>>> strategy 'discard'
>>> [warn] Merging 
>>> 'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.xml' with strategy 
>>> 'discard'
>>> [warn] Merging 'META-INF/maven/commons-dbcp/commons-dbcp/pom.properties' 
>>> with strategy 'discard'
>>> [warn] Merging 'META-INF/maven/commons-dbcp/commons-dbcp/pom.xml' with 
>>> strategy 'discard'
>>> [warn] Merging 'META-INF/maven/commons-pool/commons-pool/pom.properties' 
>>> with strategy 'discard'
>>> [warn] Merging 'META-INF/maven/commons-pool/commons-pool/pom.xml' with 
>>> strategy 'discard'
>>> [warn] Merging 'META-INF/maven/joda-time/joda-time/pom.properties' with 
>>> strategy 'discard'
>>> [warn] Merging 'META-INF/maven/joda-time/joda-time/pom.xml' with strategy 
>>> 'discard'
>>> [warn] Merging 'META-INF/maven/log4j/log4j/pom.properties' with strategy 
>>> 'discard'
>>> [warn] Merging 'META-INF/maven/log4j/log4j/pom.xml' with strategy 'discard'
>>> [warn] Merging 'META-INF/maven/org.joda/joda-convert/pom.properties' with 
>>> strategy 'discard'
>>> [warn] Merging 'META-INF/maven/org.joda/joda-convert/pom.xml' with strategy 
>>> 'discard'
>>> [warn] Merging 'META-INF/maven/org.slf4j/slf4j-api/pom.properties' with 
>>> strategy 'discard'
>>> [warn] Merging 'META-INF/maven/org.slf4j/slf4j-api/pom.xml' with strategy 
>>> 'discard'
>>> [warn] Merging 'META-INF/maven/org.slf4j/slf4j-log4j12/pom.properties' with 
>>> strategy 'discard'
>>> [warn] Merging 'META-INF/maven/org.slf4j/slf4j-log4j12/pom.xml' with 
>>> strategy 'discard'
>>> [warn] Merging 'META-INF/services/java.sql.Driver' with strategy 
>>> 'filterDistinctLines'
>>> [warn] Merging 'rootdoc.txt' with strategy 'concat'
>>> [warn] Strategy 'concat' was applied to a file
>>> [warn] Strategy 'discard' was applied to 17 files
>>> [warn] Strategy 'filterDistinctLines' was applied to a file
>>> [warn] Strategy 'rename' was applied to 4 files
>>> 
>>> When submitting the spark application through the command
>>> 
>>> sh /usr/lib/spark/bin/spark-submit -class com.xxx.ExampleClassName 
>>> target/scala-2.10/-snapshot.jar
>>> 
>>> I end up the the following error, 
>>> 
>>> Exception in thread "main" java.lang.NoClassDefFoundError: 
>>> org/joda/time/format/DateTimeFormat
>>> at com.amazonaws.auth.AWS4Signer.(AWS4Signer.java:44)
>>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>>> at 
>>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
>>> at 
>>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
>>> at java.lang.Class.newInstance(Class.java:379)
>>> at com.amazonaws.auth.SignerFactory.createSigner(SignerFactory.java:119)
>>> at 
>>> com.amazonaws.auth.SignerFactory.lookupAndCreateSigner(SignerFactory.java:105)
>>> at com.amazonaws.auth.SignerFactory.getSigner(SignerFactory.java:78)
>>> at 
>>> com.amazonaws.AmazonWebServiceClient.computeSignerByServiceRegion(AmazonWebServiceClient.java:307)
>>> at 
>>> com.amazonaws.AmazonWebServiceClient.computeSignerByURI(AmazonWebServiceClient.java:280)
>>> at 
>>> com.amazonaws.AmazonWebServiceClient.setEndpoint(AmazonWebServiceClient.java:160)
>>> at 
>>> com.amazonaws.services.kinesis.AmazonKinesisClient.setEndpoint(AmazonKinesisClient.java:2102)
>>> at 
>>> com.amazonaws.services.kinesis.AmazonKinesisClient.init(AmazonKinesisClient.java:216)
>>> at 
>>> com.amazonaws.services.kinesis.AmazonKinesisClient.(AmazonKinesisClient.java:202)
>>> at 
>>> com.amazonaws.services.kinesis.AmazonKinesisClient.(AmazonKinesisClient.java:17

Re: Registering classes with KryoSerializer

2015-04-13 Thread Imran Rashid
Those funny class names come from scala's specialization -- its compiling a
different version of OpenHashMap for each primitive you stick in the type
parameter.  Here's a super simple example:

*➜  **~ * more Foo.scala



class Foo[@specialized X]

*➜  **~ * scalac Foo.scala



*➜  **~ * ls Foo*.class



Foo$mcB$sp.class Foo$mcC$sp.class Foo$mcD$sp.class Foo$mcF$sp.class
Foo$mcI$sp.class Foo$mcJ$sp.class Foo$mcS$sp.class Foo$mcV$sp.class
Foo$mcZ$sp.class Foo.class

Sadly, I'm not sure of a foolproof way of getting all those specialized
versions registered except for registering with these strange names.
Here's an example of how its done by chill for Tuples (which is what spark
is relying on for its own registration of tuples):

https://github.com/twitter/chill/blob/6d03f6976f33f6e2e16b8e254fead1625720c281/chill-scala/src/main/scala/com/twitter/chill/TupleSerializers.scala#L861

On Mon, Mar 30, 2015 at 3:59 PM, Arun Lists  wrote:

> I am trying to register classes with KryoSerializer. I get the following
> error message:
>
> How do I find out what class is being referred to by: *OpenHashMap$mcI$sp
> ?*
>
> *com.esotericsoftware.kryo.KryoException:
> java.lang.IllegalArgumentException: Class is not registered:
> com.comp.common.base.OpenHashMap$mcI$sp*
>
> *Note: To register this class use: *
> *kryo.register(com.dtex.common.base.OpenHashMap$mcI$sp.class);*
>
> I have registered other classes with it by using:
>
> sparkConf.registerKryoClasses(Array(
>
>   classOf[MyClass]
>
> ))
>
>
> Thanks,
>
> arun
>
>
>


Re: Understanding Spark Memory distribution

2015-04-13 Thread Imran Rashid
broadcast variables count towards "spark.storage.memoryFraction", so they
use the same "pool" of memory as cached RDDs.

That being said, I'm really not sure why you are running into problems, it
seems like you have plenty of memory available.  Most likely its got
nothing to do with broadcast variables or caching -- its just whatever
logic you are applying in your transformations that are causing lots of GC
to occur during the computation.  Hard to say without knowing more details.

You could try increasing the timeout for the failed askWithReply by
increasing "spark.akka.lookupTimeout" (defaults to 30), but that would most
likely be treating a symptom, not the root cause.

On Fri, Mar 27, 2015 at 4:52 PM, Ankur Srivastava <
ankur.srivast...@gmail.com> wrote:

> Hi All,
>
> I am running a spark cluster on EC2 instances of type: m3.2xlarge. I have
> given 26gb of memory with all 8 cores to my executors. I can see that in
> the logs too:
>
> *15/03/27 21:31:06 INFO AppClient$ClientActor: Executor added:
> app-20150327213106-/0 on worker-20150327212934-10.x.y.z-40128
> (10.x.y.z:40128) with 8 cores*
>
> I am not caching any RDD so I have set "spark.storage.memoryFraction" to
> 0.2. I can see on SparkUI under executors tab Memory used is 0.0/4.5 GB.
>
> I am now confused with these logs?
>
> *15/03/27 21:31:08 INFO BlockManagerMasterActor: Registering block manager
> 10.77.100.196:58407  with 4.5 GB RAM,
> BlockManagerId(4, 10.x.y.z, 58407)*
>
> I am broadcasting a large object of 3 gb and after that when I am creating
> an RDD, I see logs which show this 4.5 GB memory getting full and then I
> get OOM.
>
> How can I make block manager use more memory?
>
> Is there any other fine tuning I need to do for broadcasting large objects?
>
> And does broadcast variable use cache memory or rest of the heap?
>
>
> Thanks
>
> Ankur
>


Re: [GraphX] aggregateMessages with active set

2015-04-13 Thread James
Hello,

Great thanks for your reply. From the code I found that the reason why my
program will scan all the edges is becasue of the EdgeDirection I passed
into is EdgeDirection.Either.

However I still met the problem of "Time consuming of each iteration will
not decrease by time". Thus I have two questions:

1. what is the meaning of activeFraction in [1]
2. As my edgeRDD is too large to cache into memory, I used
StorageLevel.MEMORY_AND_DISK_SER as persist level. thus if the program used
"aggregateMessagesIndexScan", will the program still have to load all edge
list into the memory?

[1]
https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala#L237-266

Alcaid


2015-04-10 2:47 GMT+08:00 Ankur Dave :

> Actually, GraphX doesn't need to scan all the edges, because it
> maintains a clustered index on the source vertex id (that is, it sorts
> the edges by source vertex id and stores the offsets in a hash table).
> If the activeDirection is appropriately set, it can then jump only to
> the clusters with active source vertices.
>
> See the EdgePartition#index field [1], which stores the offsets, and
> the logic in GraphImpl#aggregateMessagesWithActiveSet [2], which
> decides whether to do a full scan or use the index.
>
> [1]
> https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala#L60
> [2].
> https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala#L237-266
>
> Ankur
>
>
> On Thu, Apr 9, 2015 at 3:21 AM, James  wrote:
> > In aggregateMessagesWithActiveSet, Spark still have to read all edges. It
> > means that a fixed time which scale with graph size is unavoidable on a
> > pregel-like iteration.
> >
> > But what if I have to iterate nearly 100 iterations but at the last 50
> > iterations there are only < 0.1% nodes need to be updated ? The fixed
> time
> > make the program finished at a unacceptable time consumption.
>


How can I add my custom Rule to spark sql?

2015-04-13 Thread Andy Zhao
Hi guys,

I want to add my custom Rules(whatever the rule is) when the sql Logical
Plan is being analysed.
Is there a way to do that in the spark application code?


Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-can-I-add-my-custom-Rule-to-spark-sql-tp22485.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: [Spark1.3] UDF registration issue

2015-04-13 Thread Takeshi Yamamuro
Hi,

It's a syntax error in Spark-1.3.
The next release of spark supports the kind of UDF calls in DataFrame.
See a link below.

https://issues.apache.org/jira/browse/SPARK-6379


On Sat, Apr 11, 2015 at 3:30 AM, Yana Kadiyska 
wrote:

> Hi, I'm running into some trouble trying to register a UDF:
>
> scala> sqlContext.udf.register("strLen", (s: String) => s.length())
> res22: org.apache.spark.sql.UserDefinedFunction = 
> UserDefinedFunction(,IntegerType)
>
> scala> cleanProcessDF.withColumn("dii",strLen(col("di")))
> :33: error: not found: value strLen
>   cleanProcessDF.withColumn("dii",strLen(col("di")))
>
> ​
>
> Where cleanProcessDF is a dataframe
> Is my syntax wrong? Or am I missing an import of some sort?
>



-- 
---
Takeshi Yamamuro


Re: Spark sql failed in yarn-cluster mode when connecting to non-default hive database

2015-04-13 Thread sachin Singh
Hi Linlin,
have you got the solution for this issue, if yes then what are the thing
need to make correct,because I am also getting same error,when submitting
spark job in cluster mode getting error as under -
2015-04-14 18:16:43 DEBUG Transaction - Transaction rolled back in 0 ms
2015-04-14 18:16:43 ERROR DDLTask -
org.apache.hadoop.hive.ql.metadata.HiveException: Database does not exist:
my_database
at 
org.apache.hadoop.hive.ql.exec.DDLTask.switchDatabase(DDLTask.java:4054)
at org.apache.hadoop.hive.ql.exec.DDLTask.execute(DDLTask.java:269)
at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:153)
at 
org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java
...


Please suggest, I have copied hive-site.xml in spark/conf in standalone its
working fine.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-sql-failed-in-yarn-cluster-mode-when-connecting-to-non-default-hive-database-tp11811p22486.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Fwd: Expected behavior for DataFrame.unionAll

2015-04-13 Thread Justin Yip
Hello,

I am experimenting with DataFrame. I tried to construct two DataFrames with:
1. case class A(a: Int, b: String)
scala> adf.printSchema()
root
 |-- a: integer (nullable = false)
 |-- b: string (nullable = true)

2. case class B(a: String, c: Int)
scala> bdf.printSchema()
root
 |-- a: string (nullable = true)
 |-- c: integer (nullable = false)


Then I unioned the these two DataFrame with the unionAll function, and I
get the following schema. It is kind of a mixture of A and B.

scala> val udf = adf.unionAll(bdf)
scala> udf.printSchema()
root
 |-- a: string (nullable = false)
 |-- b: string (nullable = true)

The unionAll documentation says it behaves like the SQL UNION ALL function.
However, unioning incompatible types is not well defined for SQL. Is there
any expected behavior for unioning incompatible data frames?

Thanks.

Justin




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Fwd-Expected-behavior-for-DataFrame-unionAll-tp22487.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: RDD generated on every query

2015-04-13 Thread Akhil Das
You can use a tachyon based storage for that and everytime the client
queries, you just get it from there.

Thanks
Best Regards

On Mon, Apr 6, 2015 at 6:01 PM, Siddharth Ubale  wrote:

>  Hi ,
>
>
>
> In Spark Web Application the RDD is generating every time client is
> sending a query request. Is there any way where the RDD is compiled once
> and run query again and again on active SparkContext?
>
>
>
> Thanks,
>
> Siddharth Ubale,
>
> *Synchronized Communications *
>
> *#43, Velankani Tech Park, Block No. II, *
>
> *3rd Floor, Electronic City Phase I,*
>
> *Bangalore – 560 100*
>
> *Tel : +91 80 3202 4060*
>
> *Web:* *www.syncoms.com* 
>
> *[image: LogoNEWmohLARGE]*
>
> *London*|*Bangalore*|*Orlando*
>
>
>
> *we innovate, plan, execute, and transform the business​*
>
>
>


Re: RDD generated on every query

2015-04-13 Thread twinkle sachdeva
Hi,

If you have the same spark context, then you can cache the query result via
caching the table ( sqlContext.cacheTable("tableName") ).

Maybe you can have a look at OOyola server also.



On Tue, Apr 14, 2015 at 11:36 AM, Akhil Das 
wrote:

> You can use a tachyon based storage for that and everytime the client
> queries, you just get it from there.
>
> Thanks
> Best Regards
>
> On Mon, Apr 6, 2015 at 6:01 PM, Siddharth Ubale <
> siddharth.ub...@syncoms.com> wrote:
>
>>  Hi ,
>>
>>
>>
>> In Spark Web Application the RDD is generating every time client is
>> sending a query request. Is there any way where the RDD is compiled once
>> and run query again and again on active SparkContext?
>>
>>
>>
>> Thanks,
>>
>> Siddharth Ubale,
>>
>> *Synchronized Communications *
>>
>> *#43, Velankani Tech Park, Block No. II, *
>>
>> *3rd Floor, Electronic City Phase I,*
>>
>> *Bangalore – 560 100*
>>
>> *Tel : +91 80 3202 4060*
>>
>> *Web:* *www.syncoms.com* 
>>
>> *[image: LogoNEWmohLARGE]*
>>
>> *London*|*Bangalore*|*Orlando*
>>
>>
>>
>> *we innovate, plan, execute, and transform the business​*
>>
>>
>>
>
>


Re: Expected behavior for DataFrame.unionAll

2015-04-13 Thread Reynold Xin
I think what happened was applying the narrowest possible type. Type
widening is required, and as a result, the narrowest type is string between
a string and an int.

https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala#L144


On Tue, Apr 7, 2015 at 5:00 PM, Justin Yip  wrote:

> Hello,
>
> I am experimenting with DataFrame. I tried to construct two DataFrames
> with:
> 1. case class A(a: Int, b: String)
> scala> adf.printSchema()
> root
>  |-- a: integer (nullable = false)
>  |-- b: string (nullable = true)
>
> 2. case class B(a: String, c: Int)
> scala> bdf.printSchema()
> root
>  |-- a: string (nullable = true)
>  |-- c: integer (nullable = false)
>
>
> Then I unioned the these two DataFrame with the unionAll function, and I
> get the following schema. It is kind of a mixture of A and B.
>
> scala> val udf = adf.unionAll(bdf)
> scala> udf.printSchema()
> root
>  |-- a: string (nullable = false)
>  |-- b: string (nullable = true)
>
> The unionAll documentation says it behaves like the SQL UNION ALL
> function. However, unioning incompatible types is not well defined for SQL.
> Is there any expected behavior for unioning incompatible data frames?
>
> Thanks.
>
> Justin
>


Re: [Spark1.3] UDF registration issue

2015-04-13 Thread Reynold Xin
You can do this:

strLen = udf((s: String) => s.length())
cleanProcessDF.withColumn("dii",strLen(col("di")))

(You might need to play with the type signature a little bit to get it to
compile)


On Fri, Apr 10, 2015 at 11:30 AM, Yana Kadiyska 
wrote:

> Hi, I'm running into some trouble trying to register a UDF:
>
> scala> sqlContext.udf.register("strLen", (s: String) => s.length())
> res22: org.apache.spark.sql.UserDefinedFunction = 
> UserDefinedFunction(,IntegerType)
>
> scala> cleanProcessDF.withColumn("dii",strLen(col("di")))
> :33: error: not found: value strLen
>   cleanProcessDF.withColumn("dii",strLen(col("di")))
>
> ​
>
> Where cleanProcessDF is a dataframe
> Is my syntax wrong? Or am I missing an import of some sort?
>


Re: java.io.NotSerializableException: org.apache.hadoop.hbase.client.Result

2015-04-13 Thread Akhil Das
One hack you can put in would be to bring Result class

locally and serialize it (implements serializable) and use it.

Thanks
Best Regards

On Tue, Apr 7, 2015 at 12:07 AM, Jeetendra Gangele 
wrote:

> I hit again same issue This time I tried to return the Object it failed
> with task not serialized below is the code
> here vendor record is serializable
>
> private static JavaRDD
> getVendorDataToProcess(JavaSparkContext sc) throws IOException {
>  return sc
> .newAPIHadoopRDD(getVendorDataRowKeyScannerConfiguration(),
> TableInputFormat.class,
> ImmutableBytesWritable.class, Result.class)
> .map(new Function,
> VendorRecord>() {
> @Override
> public VendorRecord call(Tuple2 v1)
> throws Exception {
> String rowKey = new String(v1._1.get());
>  VendorRecord vd=vendorDataDAO.getVendorDataForRowkey(rowKey);
>  return vd;
> }
> });
>  }
>
>
> On 1 April 2015 at 02:07, Ted Yu  wrote:
>
>> Jeetendra:
>> Please extract the information you need from Result and return the
>> extracted portion - instead of returning Result itself.
>>
>> Cheers
>>
>> On Tue, Mar 31, 2015 at 1:14 PM, Nan Zhu  wrote:
>>
>>> The example in
>>> https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala
>>>  might
>>> help
>>>
>>> Best,
>>>
>>> --
>>> Nan Zhu
>>> http://codingcat.me
>>>
>>> On Tuesday, March 31, 2015 at 3:56 PM, Sean Owen wrote:
>>>
>>> Yep, it's not serializable:
>>>
>>> https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/Result.html
>>>
>>> You can't return this from a distributed operation since that would
>>> mean it has to travel over the network and you haven't supplied any
>>> way to convert the thing into bytes.
>>>
>>> On Tue, Mar 31, 2015 at 8:51 PM, Jeetendra Gangele 
>>> wrote:
>>>
>>> When I am trying to get the result from Hbase and running mapToPair
>>> function
>>> of RRD its giving the error
>>> java.io.NotSerializableException: org.apache.hadoop.hbase.client.Result
>>>
>>> Here is the code
>>>
>>> // private static JavaPairRDD
>>> getCompanyDataRDD(JavaSparkContext sc) throws IOException {
>>> // return sc.newAPIHadoopRDD(companyDAO.getCompnayDataConfiguration(),
>>> TableInputFormat.class, ImmutableBytesWritable.class,
>>> // Result.class).mapToPair(new
>>> PairFunction, Integer, Result>() {
>>> //
>>> // public Tuple2 call(Tuple2>> Result> t) throws Exception {
>>> // System.out.println("In getCompanyDataRDD"+t._2);
>>> //
>>> // String cknid = Bytes.toString(t._1.get());
>>> // System.out.println("processing cknids is:"+cknid);
>>> // Integer cknidInt = Integer.parseInt(cknid);
>>> // Tuple2 returnTuple = new Tuple2>> Result>(cknidInt, t._2);
>>> // return returnTuple;
>>> // }
>>> // });
>>> // }
>>>
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>>
>>
>
>
>
>


Re: SparkSQL + Parquet performance

2015-04-13 Thread Akhil Das
That totally depends on your disk IO and the number of CPUs that you have
in the cluster. For example, if you are having a disk IO of 100MB/s and a
handful of CPUs ( say 40 cores, on 10 machines), then it could take you to
~ 1GB/Sec i believe.

Thanks
Best Regards

On Tue, Apr 7, 2015 at 2:48 AM, Paolo Platter 
wrote:

>  Hi all,
>
>  is there anyone using SparkSQL + Parquet that has made a benchmark
> about storing parquet files on HDFS or on CFS ( Cassandra File System )?
>  What storage can improve performance of SparkSQL+ Parquet ?
>
>  Thanks
>
>  Paolo
>
>


Re: Expected behavior for DataFrame.unionAll

2015-04-13 Thread Justin Yip
That explains it. Thanks Reynold.

Justin

On Mon, Apr 13, 2015 at 11:26 PM, Reynold Xin  wrote:

> I think what happened was applying the narrowest possible type. Type
> widening is required, and as a result, the narrowest type is string between
> a string and an int.
>
>
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala#L144
>
>
>
> On Tue, Apr 7, 2015 at 5:00 PM, Justin Yip 
> wrote:
>
>> Hello,
>>
>> I am experimenting with DataFrame. I tried to construct two DataFrames
>> with:
>> 1. case class A(a: Int, b: String)
>> scala> adf.printSchema()
>> root
>>  |-- a: integer (nullable = false)
>>  |-- b: string (nullable = true)
>>
>> 2. case class B(a: String, c: Int)
>> scala> bdf.printSchema()
>> root
>>  |-- a: string (nullable = true)
>>  |-- c: integer (nullable = false)
>>
>>
>> Then I unioned the these two DataFrame with the unionAll function, and I
>> get the following schema. It is kind of a mixture of A and B.
>>
>> scala> val udf = adf.unionAll(bdf)
>> scala> udf.printSchema()
>> root
>>  |-- a: string (nullable = false)
>>  |-- b: string (nullable = true)
>>
>> The unionAll documentation says it behaves like the SQL UNION ALL
>> function. However, unioning incompatible types is not well defined for SQL.
>> Is there any expected behavior for unioning incompatible data frames?
>>
>> Thanks.
>>
>> Justin
>>
>
>


Re: Seeing message about receiver not being de-registered on invoking Streaming context stop

2015-04-13 Thread Akhil Das
When you say "done fetching documents", does it mean that you are stopping
the streamingContext? (ssc.stop) or you meant completed fetching documents
for a batch? If possible, you could paste your custom receiver code so that
we can have a look at it.

Thanks
Best Regards

On Tue, Apr 7, 2015 at 8:46 AM, Hari Polisetty  wrote:

>  My application is running Spark in local mode and  I have a Spark
> Streaming Listener as well as a Custom Receiver. When the receiver is done
> fetching all documents, it invokes “stop” on itself.
>
> I see the StreamingListener  getting a callback on “onReceiverStopped”
> where I stop the streaming context.
>
>
> However, I see the following message in my logs:
>
>
> 2015-04-06 16:41:51,193 WARN [Thread-66]
> com.amazon.grcs.gapanalysis.spark.streams.ElasticSearchResponseReceiver.onStop
> - Stopped receiver
>
> 2015-04-06 16:41:51,193 ERROR
> [sparkDriver-akka.actor.default-dispatcher-17]
> org.apache.spark.Logging$class.logError - Deregistered receiver for stream
> 0: AlHURLEY
>
> 2015-04-06 16:41:51,202 WARN [Executor task launch worker-2]
> org.apache.spark.Logging$class.logWarning - Stopped executor without error
>
> 2015-04-06 16:41:51,203 WARN [StreamingListenerBus]
> org.apache.spark.Logging$class.logWarning - All of the receivers have not
> deregistered, Map(0 ->
> ReceiverInfo(0,ElasticSearchResponseReceiver-0,null,false,localhost,HURLEY))
>
>
> What am I missing or doing wrong?
>


Re: set spark.storage.memoryFraction to 0 when no cached RDD and memory area for broadcast value?

2015-04-13 Thread Akhil Das
You could try leaving all the configuration values to default and running
your application and see if you are still hitting the heap issue, If so try
adding a Swap space to the machines which will definitely help. Another way
would be to set the heap space manually (export _JAVA_OPTIONS="-Xmx5g")

Thanks
Best Regards

On Wed, Apr 8, 2015 at 12:45 AM, Shuai Zheng  wrote:

> Hi All,
>
>
>
> I am a bit confused on spark.storage.memoryFraction, this is used to set
> the area for RDD usage, will this RDD means only for cached and persisted
> RDD? So if my program has no cached RDD at all (means that I have no
> .cache() or .persist() call on any RDD), then I can set this
> spark.storage.memoryFraction to a very small number or even zero?
>
>
>
> I am writing a program which consume a lot of memory (broadcast value,
> runtime, etc). But I have no cached RDD, so should I just turn off this
> spark.storage.memoryFraction to 0 (which will help me to improve the
> performance)?
>
>
>
> And I have another issue on the broadcast, when I try to get a broadcast
> value, it throws me out of memory error, which part of memory should I
> allocate more (if I can’t increase my overall memory size).
>
>
>
> java.lang.OutOfMemoryError: Java heap spac
>
> e
>
> at
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$DoubleA
>
> rraySerializer.read(DefaultArraySerializers.java:218)
>
> at
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$DoubleA
>
> rraySerializer.read(DefaultArraySerializers.java:200)
>
> at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699)
>
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.rea
>
> d(FieldSerializer.java:611)
>
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSeria
>
> lizer.java:221)
>
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
>
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.rea
>
> d(FieldSerializer.java:605)
>
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSeria
>
> lizer.java:221)
>
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>
> at
> org.apache.spark.serializer.KryoDeserializationStream.readObject(Kryo
>
> Serializer.scala:138)
>
> at
> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Ser
>
> ializer.scala:133)
>
> at
> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>
> at
> org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:2
>
> 48)
>
> at
> org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:13
>
> 6)
>
> at
> org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:5
>
> 49)
>
> at
> org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:431
>
> )
>
> at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlo
>
> ck$1.apply(TorrentBroadcast.scala:167)
>
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1152)
>
> at
> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(Torren
>
> tBroadcast.scala:164)
>
> at
> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(Torrent
>
> Broadcast.scala:64)
>
> at
> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.s
>
> cala:64)
>
> at
> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast
>
> .scala:87)
>
>
>
>
>
> Regards,
>
>
>
> Shuai
>


Re: value reduceByKeyAndWindow is not a member of org.apache.spark.streaming.dstream.DStream[(String, Int)]

2015-04-13 Thread Akhil Das
Just make sure you import the followings:

import org.apache.spark.SparkContext._
import org.apache.spark.StreamingContext._



Thanks
Best Regards

On Wed, Apr 8, 2015 at 6:38 AM, Su She  wrote:

> Hello Everyone,
>
> I am trying to implement this example (Spark Streaming with Twitter).
>
>
> https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala
>
> I am able to do:
>
> hashTags.print() to get a live stream of filtered hashtags, but I get
> these warnings, not sure if they're related to the error:
>
> WARN BlockManager: Block input-0-1428450594600 replicated to only 0
> peer(s) instead of 1 peers
>
> then when I try to print out topCounts60 or topCounts10, I get this
> error when building:
>
>
>  
> /home/ec2-user/sparkApps/TwitterApp/src/main/scala/TwitterPopularTags.scala:35:
> error: value reduceByKeyAndWindow is not a member of
> org.apache.spark.streaming.dstream.DStream[(String, Int)]
> [INFO] val topCounts60 = hashTags.map((_,
> 1)).reduceByKeyAndWindow(_ + _, Seconds(60)).map{case (topic, count)
> => (count, topic)}.transform(_.sortByKey(false))
>
>
> Thank you for the help!
>
> Best,
>
> Su
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>