Re: Having lots of FetchFailedException in join

2015-03-05 Thread Jianshi Huang
Thanks. I was about to submit a ticket for this :)

Also there's a ticket for sort-merge based groupbykey
https://issues.apache.org/jira/browse/SPARK-3461

BTW, any idea why run with netty didn't output OOM error messages? It's
very confusing in troubleshooting.


Jianshi

On Thu, Mar 5, 2015 at 4:01 PM, Shao, Saisai  wrote:

>  I think there’s a lot of JIRA trying to solve this problem (
> https://issues.apache.org/jira/browse/SPARK-5763). Basically sort merge
> join is a good choice.
>
>
>
> Thanks
>
> Jerry
>
>
>
> *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
> *Sent:* Thursday, March 5, 2015 3:55 PM
> *To:* Shao, Saisai
> *Cc:* Cheng, Hao; user
>
> *Subject:* Re: Having lots of FetchFailedException in join
>
>
>
> There're some skew.
>
>
>
> 64
>
> 6164
>
> 0
>
> SUCCESS
>
> PROCESS_LOCAL
>
> 200 / 
>
> 2015/03/04 23:45:47
>
> 1.1 min
>
> 6 s
>
> 198.6 MB
>
> 21.1 GB
>
> 240.8 MB
>
> 59
>
> 6159
>
> 0
>
> SUCCESS
>
> PROCESS_LOCAL
>
> 30 / 
>
> 2015/03/04 23:45:47
>
> 44 s
>
> 5 s
>
> 200.7 MB
>
> 4.8 GB
>
> 154.0 MB
>
> But I expect this kind of skewness to be quite common.
>
>
>
> Jianshi
>
>
>
>
>
> On Thu, Mar 5, 2015 at 3:48 PM, Jianshi Huang 
> wrote:
>
>  I see. I'm using core's join. The data might have some skewness
> (checking).
>
>
>
> I understand shuffle can spill data to disk but when consuming it, say in
> cogroup or groupByKey, it still needs to read the whole group elements,
> right? I guess OOM happened there when reading very large groups.
>
>
>
> Jianshi
>
>
>
> On Thu, Mar 5, 2015 at 3:38 PM, Shao, Saisai 
> wrote:
>
>  I think what you could do is to monitor through web UI to see if there’s
> any skew or other symptoms in shuffle write and read. For GC you could use
> the below configuration as you mentioned.
>
>
>
> From Spark core side, all the shuffle related operations can spill the
> data into disk and no need to read the whole partition into memory. But if
> you uses SparkSQL, it depends on how SparkSQL uses this operators.
>
>
>
> CC @hao if he has some thoughts on it.
>
>
>
> Thanks
>
> Jerry
>
>
>
> *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
> *Sent:* Thursday, March 5, 2015 3:28 PM
> *To:* Shao, Saisai
>
>
> *Cc:* user
> *Subject:* Re: Having lots of FetchFailedException in join
>
>
>
> Hi Saisai,
>
>
>
> What's your suggested settings on monitoring shuffle? I've
> enabled -XX:+PrintGCDetails -XX:+PrintGCTimeStamps for GC logging.
>
>
>
> I found SPARK-3461 (Support external groupByKey using
> repartitionAndSortWithinPartitions) want to make groupByKey using external
> storage. It's still open status. Does that mean now
> groupByKey/cogroup/join(implemented as cogroup + flatmap) will still read
> the group as a whole during consuming?
>
>
>
> How can I deal with the key skewness in joins? Is there a skew-join
> implementation?
>
>
>
>
>
> Jianshi
>
>
>
>
>
>
>
> On Thu, Mar 5, 2015 at 2:44 PM, Shao, Saisai 
> wrote:
>
>  Hi Jianshi,
>
>
>
> From my understanding, it may not be the problem of NIO or Netty, looking
> at your stack trace, the OOM is occurred in EAOM(ExternalAppendOnlyMap),
> theoretically EAOM can spill the data into disk if memory is not enough,
> but there might some issues when join key is skewed or key number is
> smaller, so you will meet OOM.
>
>
>
> Maybe you could monitor each stage or task’s shuffle and GC status also
> system status to identify the problem.
>
>
>
> Thanks
>
> Jerry
>
>
>
> *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
> *Sent:* Thursday, March 5, 2015 2:32 PM
> *To:* Aaron Davidson
> *Cc:* user
> *Subject:* Re: Having lots of FetchFailedException in join
>
>
>
> One really interesting is that when I'm using the
> netty-based spark.shuffle.blockTransferService, there's no OOM error
> messages (java.lang.OutOfMemoryError: Java heap space).
>
>
>
> Any idea why it's not here?
>
>
>
> I'm using Spark 1.2.1.
>
>
>
> Jianshi
>
>
>
> On Thu, Mar 5, 2015 at 1:56 PM, Jianshi Huang 
> wrote:
>
>  I changed spark.shuffle.blockTransferService to nio and now I'm getting
> OOM errors, I'm doing a big join operation.
>
>
>
>
>
> 15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0
> (TID 6207)
>
> java.lang.OutOfMemoryError: Java heap space
>
> at
> org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142)
>
> at
> org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74)
>
> at
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179)
>
> at
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178)
>
> at
> org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122)
>
> at
> org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121)
>
> at
> org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138)
>
> at
> org.apache.spark.uti

Re: Which OutputCommitter to use for S3?

2015-03-05 Thread Pei-Lun Lee
Thanks for the DirectOutputCommitter example.
However I found it only works for saveAsHadoopFile. What about
saveAsParquetFile?
It looks like SparkSQL is using ParquetOutputCommitter, which is subclass
of FileOutputCommitter.

On Fri, Feb 27, 2015 at 1:52 AM, Thomas Demoor 
wrote:

> FYI. We're currently addressing this at the Hadoop level in
> https://issues.apache.org/jira/browse/HADOOP-9565
>
>
> Thomas Demoor
>
> On Mon, Feb 23, 2015 at 10:16 PM, Darin McBeath <
> ddmcbe...@yahoo.com.invalid> wrote:
>
>> Just to close the loop in case anyone runs into the same problem I had.
>>
>> By setting --hadoop-major-version=2 when using the ec2 scripts,
>> everything worked fine.
>>
>> Darin.
>>
>>
>> - Original Message -
>> From: Darin McBeath 
>> To: Mingyu Kim ; Aaron Davidson 
>> Cc: "user@spark.apache.org" 
>> Sent: Monday, February 23, 2015 3:16 PM
>> Subject: Re: Which OutputCommitter to use for S3?
>>
>> Thanks.  I think my problem might actually be the other way around.
>>
>> I'm compiling with hadoop 2,  but when I startup Spark, using the ec2
>> scripts, I don't specify a
>> -hadoop-major-version and the default is 1.   I'm guessing that if I make
>> that a 2 that it might work correctly.  I'll try it and post a response.
>>
>>
>> - Original Message -
>> From: Mingyu Kim 
>> To: Darin McBeath ; Aaron Davidson <
>> ilike...@gmail.com>
>> Cc: "user@spark.apache.org" 
>> Sent: Monday, February 23, 2015 3:06 PM
>> Subject: Re: Which OutputCommitter to use for S3?
>>
>> Cool, we will start from there. Thanks Aaron and Josh!
>>
>> Darin, it¹s likely because the DirectOutputCommitter is compiled with
>> Hadoop 1 classes and you¹re running it with Hadoop 2.
>> org.apache.hadoop.mapred.JobContext used to be a class in Hadoop 1, and it
>> became an interface in Hadoop 2.
>>
>> Mingyu
>>
>>
>>
>>
>>
>> On 2/23/15, 11:52 AM, "Darin McBeath" 
>> wrote:
>>
>> >Aaron.  Thanks for the class. Since I'm currently writing Java based
>> >Spark applications, I tried converting your class to Java (it seemed
>> >pretty straightforward).
>> >
>> >I set up the use of the class as follows:
>> >
>> >SparkConf conf = new SparkConf()
>> >.set("spark.hadoop.mapred.output.committer.class",
>> >"com.elsevier.common.DirectOutputCommitter");
>> >
>> >And I then try and save a file to S3 (which I believe should use the old
>> >hadoop apis).
>> >
>> >JavaPairRDD newBaselineRDDWritable =
>> >reducedhsfPairRDD.mapToPair(new ConvertToWritableTypes());
>> >newBaselineRDDWritable.saveAsHadoopFile(baselineOutputBucketFile,
>> >Text.class, Text.class, SequenceFileOutputFormat.class,
>> >org.apache.hadoop.io.compress.GzipCodec.class);
>> >
>> >But, I get the following error message.
>> >
>> >Exception in thread "main" java.lang.IncompatibleClassChangeError: Found
>> >class org.apache.hadoop.mapred.JobContext, but interface was expected
>> >at
>>
>> >com.elsevier.common.DirectOutputCommitter.commitJob(DirectOutputCommitter.
>> >java:68)
>> >at
>> >org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:127)
>> >at
>>
>> >org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions
>> >.scala:1075)
>> >at
>>
>> >org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.sc
>> >ala:940)
>> >at
>>
>> >org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.sc
>> >ala:902)
>> >at
>>
>> >org.apache.spark.api.java.JavaPairRDD.saveAsHadoopFile(JavaPairRDD.scala:7
>> >71)
>> >at com.elsevier.spark.SparkSyncDedup.main(SparkSyncDedup.java:156)
>> >
>> >In my class, JobContext is an interface of  type
>> >org.apache.hadoop.mapred.JobContext.
>> >
>> >Is there something obvious that I might be doing wrong (or messed up in
>> >the translation from Scala to Java) or something I should look into?  I'm
>> >using Spark 1.2 with hadoop 2.4.
>> >
>> >
>> >Thanks.
>> >
>> >Darin.
>> >
>> >
>> >
>> >
>> >
>> >From: Aaron Davidson 
>> >To: Andrew Ash 
>> >Cc: Josh Rosen ; Mingyu Kim ;
>> >"user@spark.apache.org" ; Aaron Davidson
>> >
>> >Sent: Saturday, February 21, 2015 7:01 PM
>> >Subject: Re: Which OutputCommitter to use for S3?
>> >
>> >
>> >
>> >Here is the class:
>> >
>> https://urldefense.proofpoint.com/v2/url?u=https-3A__gist.github.com_aaron
>>
>> >dav_c513916e72101bbe14ec&d=AwIFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6o
>>
>> >Onmz8&r=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84&m=_2YAVrYZtQmuKZRf6sFs
>> >zOvl_-ZnxmkBPHo1K24TfGE&s=cwSCPKlJO-BJcz4UcGck3xOE2N-4V3eoNvgtFCdMLP8&e=
>> >
>> >You can use it by setting "mapred.output.committer.class" in the Hadoop
>> >configuration (or "spark.hadoop.mapred.output.committer.class" in the
>> >Spark configuration). Note that this only works for the old Hadoop APIs,
>> >I believe the new Hadoop APIs strongly tie committer to input format (so
>> >FileInputFormat always uses FileOutputCommitter), which makes this fix
>> >more difficult to apply.
>> >
>> >
>> >
>> >
>> >On Sat, Feb 21, 2015 at 12:12 PM, Andrew Ash 
>> w

Re: Which OutputCommitter to use for S3?

2015-03-05 Thread Aaron Davidson
Yes, unfortunately that direct dependency makes this injection much more
difficult for saveAsParquetFile.

On Thu, Mar 5, 2015 at 12:28 AM, Pei-Lun Lee  wrote:

> Thanks for the DirectOutputCommitter example.
> However I found it only works for saveAsHadoopFile. What about
> saveAsParquetFile?
> It looks like SparkSQL is using ParquetOutputCommitter, which is subclass
> of FileOutputCommitter.
>
> On Fri, Feb 27, 2015 at 1:52 AM, Thomas Demoor <
> thomas.dem...@amplidata.com>
> wrote:
>
> > FYI. We're currently addressing this at the Hadoop level in
> > https://issues.apache.org/jira/browse/HADOOP-9565
> >
> >
> > Thomas Demoor
> >
> > On Mon, Feb 23, 2015 at 10:16 PM, Darin McBeath <
> > ddmcbe...@yahoo.com.invalid> wrote:
> >
> >> Just to close the loop in case anyone runs into the same problem I had.
> >>
> >> By setting --hadoop-major-version=2 when using the ec2 scripts,
> >> everything worked fine.
> >>
> >> Darin.
> >>
> >>
> >> - Original Message -
> >> From: Darin McBeath 
> >> To: Mingyu Kim ; Aaron Davidson 
> >> Cc: "user@spark.apache.org" 
> >> Sent: Monday, February 23, 2015 3:16 PM
> >> Subject: Re: Which OutputCommitter to use for S3?
> >>
> >> Thanks.  I think my problem might actually be the other way around.
> >>
> >> I'm compiling with hadoop 2,  but when I startup Spark, using the ec2
> >> scripts, I don't specify a
> >> -hadoop-major-version and the default is 1.   I'm guessing that if I
> make
> >> that a 2 that it might work correctly.  I'll try it and post a response.
> >>
> >>
> >> - Original Message -
> >> From: Mingyu Kim 
> >> To: Darin McBeath ; Aaron Davidson <
> >> ilike...@gmail.com>
> >> Cc: "user@spark.apache.org" 
> >> Sent: Monday, February 23, 2015 3:06 PM
> >> Subject: Re: Which OutputCommitter to use for S3?
> >>
> >> Cool, we will start from there. Thanks Aaron and Josh!
> >>
> >> Darin, it¹s likely because the DirectOutputCommitter is compiled with
> >> Hadoop 1 classes and you¹re running it with Hadoop 2.
> >> org.apache.hadoop.mapred.JobContext used to be a class in Hadoop 1, and
> it
> >> became an interface in Hadoop 2.
> >>
> >> Mingyu
> >>
> >>
> >>
> >>
> >>
> >> On 2/23/15, 11:52 AM, "Darin McBeath" 
> >> wrote:
> >>
> >> >Aaron.  Thanks for the class. Since I'm currently writing Java based
> >> >Spark applications, I tried converting your class to Java (it seemed
> >> >pretty straightforward).
> >> >
> >> >I set up the use of the class as follows:
> >> >
> >> >SparkConf conf = new SparkConf()
> >> >.set("spark.hadoop.mapred.output.committer.class",
> >> >"com.elsevier.common.DirectOutputCommitter");
> >> >
> >> >And I then try and save a file to S3 (which I believe should use the
> old
> >> >hadoop apis).
> >> >
> >> >JavaPairRDD newBaselineRDDWritable =
> >> >reducedhsfPairRDD.mapToPair(new ConvertToWritableTypes());
> >> >newBaselineRDDWritable.saveAsHadoopFile(baselineOutputBucketFile,
> >> >Text.class, Text.class, SequenceFileOutputFormat.class,
> >> >org.apache.hadoop.io.compress.GzipCodec.class);
> >> >
> >> >But, I get the following error message.
> >> >
> >> >Exception in thread "main" java.lang.IncompatibleClassChangeError:
> Found
> >> >class org.apache.hadoop.mapred.JobContext, but interface was expected
> >> >at
> >>
> >>
> >com.elsevier.common.DirectOutputCommitter.commitJob(DirectOutputCommitter.
> >> >java:68)
> >> >at
> >>
> >org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:127)
> >> >at
> >>
> >>
> >org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions
> >> >.scala:1075)
> >> >at
> >>
> >>
> >org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.sc
> >> >ala:940)
> >> >at
> >>
> >>
> >org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.sc
> >> >ala:902)
> >> >at
> >>
> >>
> >org.apache.spark.api.java.JavaPairRDD.saveAsHadoopFile(JavaPairRDD.scala:7
> >> >71)
> >> >at com.elsevier.spark.SparkSyncDedup.main(SparkSyncDedup.java:156)
> >> >
> >> >In my class, JobContext is an interface of  type
> >> >org.apache.hadoop.mapred.JobContext.
> >> >
> >> >Is there something obvious that I might be doing wrong (or messed up in
> >> >the translation from Scala to Java) or something I should look into?
> I'm
> >> >using Spark 1.2 with hadoop 2.4.
> >> >
> >> >
> >> >Thanks.
> >> >
> >> >Darin.
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >From: Aaron Davidson 
> >> >To: Andrew Ash 
> >> >Cc: Josh Rosen ; Mingyu Kim ;
> >> >"user@spark.apache.org" ; Aaron Davidson
> >> >
> >> >Sent: Saturday, February 21, 2015 7:01 PM
> >> >Subject: Re: Which OutputCommitter to use for S3?
> >> >
> >> >
> >> >
> >> >Here is the class:
> >> >
> >>
> https://urldefense.proofpoint.com/v2/url?u=https-3A__gist.github.com_aaron
> >>
> >>
> >dav_c513916e72101bbe14ec&d=AwIFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6o
> >>
> >>
> >Onmz8&r=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84&m=_2YAVrYZtQmuKZRf6sFs
> >>
> >zOvl_-ZnxmkBPHo1K24TfGE&s=cwSCPKlJO-BJcz4UcGck3x

Re: Having lots of FetchFailedException in join

2015-03-05 Thread Aaron Davidson
However, Executors were dying when using Netty as well, so it is possible
that the OOM was occurring then too. It is also possible only one of your
Executors OOMs (due to a particularly large task) and the others display
various exceptions while trying to fetch the shuffle blocks from the failed
executor.

I cannot explain the local FileNotFoundExcepions occurring on machines that
were not throwing fatal errors, though -- typically I have only seen that
happen when a fatal error (e.g., OOM) was thrown on an Executor, causing it
to begin the termination process which involves deleting its own shuffle
files. It may then throw the FNF if other Executors request those files
before it has completed its shutdown (and will throw a ConnectionFailed
once it's completed terminating).

On Thu, Mar 5, 2015 at 12:19 AM, Shao, Saisai  wrote:

>  I’ve no idea why Netty didn’t meet OOM issue, one possibility is that
> Netty uses direct memory to save each block, whereas NIO uses on-heap
> memory, so Netty occupies less on heap memory than NIO.
>
>
>
>
>
> *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
> *Sent:* Thursday, March 5, 2015 4:14 PM
>
> *To:* Shao, Saisai
> *Cc:* Cheng, Hao; user
> *Subject:* Re: Having lots of FetchFailedException in join
>
>
>
> Thanks. I was about to submit a ticket for this :)
>
>
>
> Also there's a ticket for sort-merge based groupbykey
> https://issues.apache.org/jira/browse/SPARK-3461
>
>
>
> BTW, any idea why run with netty didn't output OOM error messages? It's
> very confusing in troubleshooting.
>
>
>
>
>
> Jianshi
>
>
>
> On Thu, Mar 5, 2015 at 4:01 PM, Shao, Saisai 
> wrote:
>
>  I think there’s a lot of JIRA trying to solve this problem (
> https://issues.apache.org/jira/browse/SPARK-5763). Basically sort merge
> join is a good choice.
>
>
>
> Thanks
>
> Jerry
>
>
>
> *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
> *Sent:* Thursday, March 5, 2015 3:55 PM
> *To:* Shao, Saisai
> *Cc:* Cheng, Hao; user
>
>
> *Subject:* Re: Having lots of FetchFailedException in join
>
>
>
> There're some skew.
>
>
>
> 64
>
> 6164
>
> 0
>
> SUCCESS
>
> PROCESS_LOCAL
>
> 200 / 
>
> 2015/03/04 23:45:47
>
> 1.1 min
>
> 6 s
>
> 198.6 MB
>
> 21.1 GB
>
> 240.8 MB
>
> 59
>
> 6159
>
> 0
>
> SUCCESS
>
> PROCESS_LOCAL
>
> 30 / 
>
> 2015/03/04 23:45:47
>
> 44 s
>
> 5 s
>
> 200.7 MB
>
> 4.8 GB
>
> 154.0 MB
>
> But I expect this kind of skewness to be quite common.
>
>
>
> Jianshi
>
>
>
>
>
> On Thu, Mar 5, 2015 at 3:48 PM, Jianshi Huang 
> wrote:
>
>  I see. I'm using core's join. The data might have some skewness
> (checking).
>
>
>
> I understand shuffle can spill data to disk but when consuming it, say in
> cogroup or groupByKey, it still needs to read the whole group elements,
> right? I guess OOM happened there when reading very large groups.
>
>
>
> Jianshi
>
>
>
> On Thu, Mar 5, 2015 at 3:38 PM, Shao, Saisai 
> wrote:
>
>  I think what you could do is to monitor through web UI to see if there’s
> any skew or other symptoms in shuffle write and read. For GC you could use
> the below configuration as you mentioned.
>
>
>
> From Spark core side, all the shuffle related operations can spill the
> data into disk and no need to read the whole partition into memory. But if
> you uses SparkSQL, it depends on how SparkSQL uses this operators.
>
>
>
> CC @hao if he has some thoughts on it.
>
>
>
> Thanks
>
> Jerry
>
>
>
> *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
> *Sent:* Thursday, March 5, 2015 3:28 PM
> *To:* Shao, Saisai
>
>
> *Cc:* user
> *Subject:* Re: Having lots of FetchFailedException in join
>
>
>
> Hi Saisai,
>
>
>
> What's your suggested settings on monitoring shuffle? I've
> enabled -XX:+PrintGCDetails -XX:+PrintGCTimeStamps for GC logging.
>
>
>
> I found SPARK-3461 (Support external groupByKey using
> repartitionAndSortWithinPartitions) want to make groupByKey using external
> storage. It's still open status. Does that mean now
> groupByKey/cogroup/join(implemented as cogroup + flatmap) will still read
> the group as a whole during consuming?
>
>
>
> How can I deal with the key skewness in joins? Is there a skew-join
> implementation?
>
>
>
>
>
> Jianshi
>
>
>
>
>
>
>
> On Thu, Mar 5, 2015 at 2:44 PM, Shao, Saisai 
> wrote:
>
>  Hi Jianshi,
>
>
>
> From my understanding, it may not be the problem of NIO or Netty, looking
> at your stack trace, the OOM is occurred in EAOM(ExternalAppendOnlyMap),
> theoretically EAOM can spill the data into disk if memory is not enough,
> but there might some issues when join key is skewed or key number is
> smaller, so you will meet OOM.
>
>
>
> Maybe you could monitor each stage or task’s shuffle and GC status also
> system status to identify the problem.
>
>
>
> Thanks
>
> Jerry
>
>
>
> *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
> *Sent:* Thursday, March 5, 2015 2:32 PM
> *To:* Aaron Davidson
> *Cc:* user
> *Subject:* Re: Having lots of FetchFailedException in join
>
>
>
> One really interesting is that when I'm using 

Re: using log4j2 with spark

2015-03-05 Thread Akhil Das
You may exclude the log4j dependency while building. You can have a look at
this build file to see how to exclude libraries
http://databricks.gitbooks.io/databricks-spark-knowledge-base/content/troubleshooting/missing_dependencies_in_jar_files.html

Thanks
Best Regards

On Thu, Mar 5, 2015 at 1:20 PM, Lior Chaga  wrote:

> Hi,
> Trying to run spark 1.2.1 w/ hadoop 1.0.4 on cluster and configure it to
> run with log4j2.
> Problem is that spark-assembly.jar contains log4j and slf4j classes
> compatible with log4j 1.2 in it, and so it detects it should use log4j 1.2 (
> https://github.com/apache/spark/blob/54e7b456dd56c9e52132154e699abca87563465b/core/src/main/scala/org/apache/spark/Logging.scala
> on line 121).
>
> Is there a maven profile for building spark-assembly w/out the log4j
> dependencies, or any other way I can force spark to use log4j2?
>
> Thanks!
> Lior
>


Re: How to parse Json formatted Kafka message in spark streaming

2015-03-05 Thread Akhil Das
When you use KafkaUtils.createStream with StringDecoders, it will return
String objects inside your messages stream. To access the elements from the
json, you could do something like the following:


   val mapStream = messages.map(x=> {
  val mapper = new ObjectMapper() with ScalaObjectMapper
  mapper.registerModule(DefaultScalaModule)

  mapper.readValue[Map[String,Any]](x)*.get("time")*
})



Thanks
Best Regards

On Thu, Mar 5, 2015 at 11:13 AM, Cui Lin  wrote:

>   Friends,
>
>   I'm trying to parse json formatted Kafka messages and then send back to
> cassandra.I have two problems:
>
>1. I got the exception below. How to check an empty RDD?
>
>  Exception in thread "main" java.lang.UnsupportedOperationException:
> empty collection
>  at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
>  at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
>  at scala.Option.getOrElse(Option.scala:120)
>  at org.apache.spark.rdd.RDD.reduce(RDD.scala:869)
>  at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57)
>  at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232)
>  at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204)
>
>  val messages = KafkaUtils.createStream[String, String, StringDecoder, 
> StringDecoder](…)
>
> messages.foreachRDD { rdd =>
>   val message:RDD[String] = rdd.map { y => y._2 }
>   sqlContext.jsonRDD(message).registerTempTable("tempTable")
>   sqlContext.sql("SELECT time,To FROM tempTable")
> .saveToCassandra(cassandra_keyspace, cassandra_table, SomeColumns("key", 
> "msg"))
> }
>
>
>  2. how to get all column names from json messages? I have hundreds of
> columns in the json formatted message.
>
>  Thanks for your help!
>
>
>
>
>  Best regards,
>
>  Cui Lin
>


Connection PHP application to Spark Sql thrift server

2015-03-05 Thread fanooos
We have two applications need to connect to Spark Sql thrift server. 

The first application is developed in java. Having spark sql thrift server
running, we following the steps in  this link

  
and the application connected smoothly without any problem. 


The second application is developed in PHP. We followed the steps provided
in  this link
 
.  When hitting the php script, the spark sql thrift server throws this
exception. 

15/03/05 11:53:19 ERROR TThreadPoolServer: Error occurred during processing
of message.
java.lang.RuntimeException: org.apache.thrift.transport.TTransportException
at
org.apache.thrift.transport.TSaslServerTransport$Factory.getTransport(TSaslServerTransport.java:219)
at
org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:189)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
Caused by: org.apache.thrift.transport.TTransportException
at
org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
at
org.apache.thrift.transport.TSaslTransport.receiveSaslMessage(TSaslTransport.java:182)
at
org.apache.thrift.transport.TSaslServerTransport.handleSaslStartMessage(TSaslServerTransport.java:125)
at 
org.apache.thrift.transport.TSaslTransport.open(TSaslTransport.java:253)
at
org.apache.thrift.transport.TSaslServerTransport.open(TSaslServerTransport.java:41)
at
org.apache.thrift.transport.TSaslServerTransport$Factory.getTransport(TSaslServerTransport.java:216)
... 4 more


I searched a lot about this exception but I can not figure out what is the
problem.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Connection-PHP-application-to-Spark-Sql-thrift-server-tp21925.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Identify the performance bottleneck from hardware prospective

2015-03-05 Thread davidkl
Hello Julaiti,

Maybe I am just asking the obvious :-) but did you check disk IO? Depending
on what you are doing that could be the bottleneck.

In my case none of the HW resources was a bottleneck, but using some
distributed features that were blocking execution (e.g. Hazelcast). Could
that be your case as well? 

Regards



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Identify-the-performance-bottleneck-from-hardware-prospective-tp21684p21927.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Extra output from Spark run

2015-03-05 Thread Sean Owen
In the console, you'd find this draws a progress bar illustrating the
current stage progress. In logs, it shows up as this sort of 'pyramid'
since CR makes a newline.

You can turn it off with spark.ui.showConsoleProgress = false

On Thu, Mar 5, 2015 at 2:11 AM, cjwang  wrote:
> When I run Spark 1.2.1, I found these display that wasn't in the previous
> releases:
>
> [Stage 12:=>   (6 + 1) /
> 16]
> [Stage 12:>(8 + 1) /
> 16]
> [Stage 12:==> (11 + 1) /
> 16]
> [Stage 12:=>  (14 + 1) /
> 16]
>
> What do they mean and how can I get rid of them?
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Extra-output-from-Spark-run-tp21920.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Managing permissions when saving as text file

2015-03-05 Thread didmar
Hi,

I'm having a problem involving file permissions on the local filesystem.

On a first machine, I have two different users :
- launcher, which launches my job from an uber jar file
- spark, which runs the master
On a second machine, I have a user spark (same uid/gid as the other) which
runs the worker

Results are written in a shared folder /home/spark/output/ (same path for
both machines) which is owned by spark and has 777 permissions

When I run a job that saves a text file to /home/spark/output/result/,
launcher creates the result folder and subfolders with 775 permissions.
The problem is that the worker, which runs has user spark, cannot write the
results :



I tried to add the sticky bit to /home/spark/output/, but this did not
suffice : it creates a part-0, starts filling it but then the following
error occurs



Is there a way to force Spark (the driver part I suppose ?) to create all
these folders with 777 permissions instead of 775 ? Or maybe there is
another way ?

Thanks,
Didier



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Managing-permissions-when-saving-as-text-file-tp21928.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Nullpointer Exception on broadcast variables (YARN Cluster mode)

2015-03-05 Thread samriddhac
Hi All

I have a simple spark application, where I am trying to broadcast a String
type variable on YARN Cluster.
But every time I am trying to access the broadcast-ed variable value , I am
getting null within the Task. It will be really helpful, if you guys can
suggest, what I am doing wrong here.
My code is like follows:-

public class TestApp implements Serializable{
static Broadcast mongoConnectionString;


public static void main( String[] args )
{
String mongoBaseURL = args[0];
SparkConf sparkConf =  new SparkConf().setAppName(Constants.appName);
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);

mongoConnectionString = javaSparkContext.broadcast(args);

JavaSQLContext javaSQLContext = new JavaSQLContext(javaSparkContext);

JavaSchemaRDD javaSchemaRDD =
javaSQLContext.jsonFile(hdfsBaseURL+Constants.hdfsInputDirectoryPath);

if(javaSchemaRDD!=null){
javaSchemaRDD.registerTempTable("LogAction");
javaSchemaRDD.cache();
pageSchemaRDD = javaSQLContext.sql(SqlConstants.getLogActionPage);
pageSchemaRDD.foreach(new Test());

} 
}

private static class Test implements VoidFunction{
/**
 * 
 */
private static final long serialVersionUID = 1L;

public void call(Row t) throws Exception {
// TODO Auto-generated method stub
logger.info("mongoConnectionString 
"+mongoConnectionString.value());
}
}



Thanks and Regards
Samriddha



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Nullpointer-Exception-on-broadcast-variables-YARN-Cluster-mode-tp21929.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Map task in Trident.

2015-03-05 Thread Vladimir Protsenko
There is a map function in clojure so you could map one collection to
other.

The most resembling operation is *each*, however when f applied to input
tuple we get tuple with two fields* f(["field-a"]) =
[ "field-a"  "field-b"]*.

How could I realize the same operation on trident stream?


Re: Extra output from Spark run

2015-03-05 Thread davidkl
If you do not want those progress indication to appear, just set
spark.ui.showConsoleProgress to false, e.g:

System.setProperty("spark.ui.showConsoleProgress", "false");

Regards



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Extra-output-from-Spark-run-tp21920p21931.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to parse Json formatted Kafka message in spark streaming

2015-03-05 Thread Helena Edelson
Hi Cui,

What version of Spark are you using? There was a bug ticket that may be related 
to this, fixed in core/src/main/scala/org/apache/spark/rdd/RDD.scala that is 
merged into versions 1.3.0 and 1.2.1 . If you are using 1.1.1 that may be the 
reason but it’s a stretch https://issues.apache.org/jira/browse/SPARK-4968

Did you verify that you have data streaming from Kafka?

Helena
https://twitter.com/helenaedelson

On Mar 5, 2015, at 12:43 AM, Cui Lin  wrote:

> Friends,
> 
> I'm trying to parse json formatted Kafka messages and then send back to 
> cassandra.I have two problems:
> I got the exception below. How to check an empty RDD?
> Exception in thread "main" java.lang.UnsupportedOperationException: empty 
> collection
> at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
> at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.reduce(RDD.scala:869)
> at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57)
> at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232)
> at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204)
> 
> val messages = KafkaUtils.createStream[String, String, StringDecoder, 
> StringDecoder](…)
> messages.foreachRDD { rdd =>
>   val message:RDD[String] = rdd.map { y => y._2 }
>   sqlContext.jsonRDD(message).registerTempTable("tempTable")
>   sqlContext.sql("SELECT time,To FROM tempTable")
> .saveToCassandra(cassandra_keyspace, cassandra_table, SomeColumns("key", 
> "msg"))
> }
> 
> 2. how to get all column names from json messages? I have hundreds of columns 
> in the json formatted message. 
> 
> Thanks for your help!
> 
> 
> 
> 
> Best regards,
> 
> Cui Lin



spark-shell --master yarn-client fail on Windows

2015-03-05 Thread Xi Shen
Hi,

My HDFS and YARN services are started, and my spark-shell can wok in local
mode.

But when I try spark-shell --master yarn-client, a job can be created at
the YARN service, but will fail very soon. The Diagnostics are:

Application application_1425559747310_0002 failed 2 times due to AM
Container for appattempt_1425559747310_0002_02 exited with exitCode: 1
For more detailed output, check application tracking page:
http://Xi-Laptop:8088/proxy/application_1425559747310_0002/Then, click on
links to logs of each attempt.
Diagnostics: Exception from container-launch.
Container id: container_1425559747310_0002_02_01
Exit code: 1
Stack trace: ExitCodeException exitCode=1:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:538)
at org.apache.hadoop.util.Shell.run(Shell.java:455)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)
at
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:211)
at
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
at
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Shell output: 1 file(s) moved.
Container exited with a non-zero exit code 1
Failing this attempt. Failing the application.

And in the AM log, there're something like:

Could not find or load main class
'-Dspark.driver.appUIAddress=http:..:4040'

And it changes from time to time.

It feels like something is not right in YARN.


Thanks,
David


RE: Connection PHP application to Spark Sql thrift server

2015-03-05 Thread Cheng, Hao
Can you query upon Hive? Let's confirm if it's a bug of SparkSQL in your PHP 
code first.

-Original Message-
From: fanooos [mailto:dev.fano...@gmail.com] 
Sent: Thursday, March 5, 2015 4:57 PM
To: user@spark.apache.org
Subject: Connection PHP application to Spark Sql thrift server

We have two applications need to connect to Spark Sql thrift server. 

The first application is developed in java. Having spark sql thrift server 
running, we following the steps in  this link 

and the application connected smoothly without any problem. 


The second application is developed in PHP. We followed the steps provided in  
this link 

.  When hitting the php script, the spark sql thrift server throws this 
exception. 

15/03/05 11:53:19 ERROR TThreadPoolServer: Error occurred during processing of 
message.
java.lang.RuntimeException: org.apache.thrift.transport.TTransportException
at
org.apache.thrift.transport.TSaslServerTransport$Factory.getTransport(TSaslServerTransport.java:219)
at
org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:189)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
Caused by: org.apache.thrift.transport.TTransportException
at
org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
at
org.apache.thrift.transport.TSaslTransport.receiveSaslMessage(TSaslTransport.java:182)
at
org.apache.thrift.transport.TSaslServerTransport.handleSaslStartMessage(TSaslServerTransport.java:125)
at 
org.apache.thrift.transport.TSaslTransport.open(TSaslTransport.java:253)
at
org.apache.thrift.transport.TSaslServerTransport.open(TSaslServerTransport.java:41)
at
org.apache.thrift.transport.TSaslServerTransport$Factory.getTransport(TSaslServerTransport.java:216)
... 4 more


I searched a lot about this exception but I can not figure out what is the 
problem.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Connection-PHP-application-to-Spark-Sql-thrift-server-tp21925.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark with data on NFS v HDFS

2015-03-05 Thread Ashish Mukherjee
Hello,

I understand Spark can be used with Hadoop or standalone. I have certain
questions related to use of the correct FS for Spark data.

What is the efficiency trade-off in feeding data to Spark from NFS v HDFS?

If one is not using Hadoop, is it still usual to house data in HDFS for
Spark to read from because of better reliability compared to NFS?

Should data be stored on local FS (not NFS) only for Spark jobs which run
on single machine?

Regards,
Ashish


Re: How to parse Json formatted Kafka message in spark streaming

2015-03-05 Thread Ted Yu
Cui:
You can check messages.partitions.size to determine whether messages is an
empty RDD.

Cheers

On Thu, Mar 5, 2015 at 12:52 AM, Akhil Das 
wrote:

> When you use KafkaUtils.createStream with StringDecoders, it will return
> String objects inside your messages stream. To access the elements from the
> json, you could do something like the following:
>
>
>val mapStream = messages.map(x=> {
>   val mapper = new ObjectMapper() with ScalaObjectMapper
>   mapper.registerModule(DefaultScalaModule)
>
>   mapper.readValue[Map[String,Any]](x)*.get("time")*
> })
>
>
>
> Thanks
> Best Regards
>
> On Thu, Mar 5, 2015 at 11:13 AM, Cui Lin  wrote:
>
>>   Friends,
>>
>>   I'm trying to parse json formatted Kafka messages and then send back
>> to cassandra.I have two problems:
>>
>>1. I got the exception below. How to check an empty RDD?
>>
>>  Exception in thread "main" java.lang.UnsupportedOperationException:
>> empty collection
>>  at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
>>  at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
>>  at scala.Option.getOrElse(Option.scala:120)
>>  at org.apache.spark.rdd.RDD.reduce(RDD.scala:869)
>>  at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57)
>>  at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232)
>>  at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204)
>>
>>  val messages = KafkaUtils.createStream[String, String, StringDecoder, 
>> StringDecoder](…)
>>
>> messages.foreachRDD { rdd =>
>>   val message:RDD[String] = rdd.map { y => y._2 }
>>   sqlContext.jsonRDD(message).registerTempTable("tempTable")
>>   sqlContext.sql("SELECT time,To FROM tempTable")
>> .saveToCassandra(cassandra_keyspace, cassandra_table, SomeColumns("key", 
>> "msg"))
>> }
>>
>>
>>  2. how to get all column names from json messages? I have hundreds of
>> columns in the json formatted message.
>>
>>  Thanks for your help!
>>
>>
>>
>>
>>  Best regards,
>>
>>  Cui Lin
>>
>
>


Partitioning Dataset and Using Reduce in Apache Spark

2015-03-05 Thread raggy
I am trying to use Apache spark to load up a file, and distribute the file to
several nodes in my cluster and then aggregate the results and obtain them.
I don't quite understand how to do this.

>From my understanding the reduce action enables Spark to combine the results
from different nodes and aggregate them together. Am I understanding this
correctly?

>From a programming perspective, I don't understand how I would code this
reduce function.

How exactly do I partition the main dataset into N pieces and ask them to be
parallel processed by using a list of transformations?

reduce is supposed to take in two elements and a function for combining
them. Are these 2 elements supposed to be RDDs from the context of Spark or
can they be any type of element? Also, if you have N different partitions
running parallel, how would reduce aggregate all their results into one
final result(since the reduce function aggregates only 2 elements)?

Also, I don't understand this example. The example from the spark website
uses reduce, but I don't see the data being processed in parallel. So, what
is the point of the reduce? If I could get a detailed explanation of the
loop in this example, I think that would clear up most of my questions.

class ComputeGradient extends Function {
  private Vector w;
  ComputeGradient(Vector w) { this.w = w; }
  public Vector call(DataPoint p) {
return p.x.times(p.y * (1 / (1 + Math.exp(w.dot(p.x))) - 1));
  }
}

JavaRDD points = spark.textFile(...).map(new
ParsePoint()).cache();
Vector w = Vector.random(D); // current separating plane
for (int i = 0; i < ITERATIONS; i++) {
  Vector gradient = points.map(new ComputeGradient(w)).reduce(new
AddVectors());
  w = w.subtract(gradient);
}
System.out.println("Final separating plane: " + w);

Also, I have been trying to find the source code for reduce from the Apache
Spark Github, but the source is pretty huge and I haven't been able to
pinpoint it. Could someone please direct me towards which file I could find
it in?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Partitioning-Dataset-and-Using-Reduce-in-Apache-Spark-tp21933.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to parse Json formatted Kafka message in spark streaming

2015-03-05 Thread Helena Edelson
Great point :) Cui, Here’s a cleaner way than I had before, w/out the use of 
spark sql for the mapping:

KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
  ssc, kafka.kafkaParams, Map("github" -> 5), StorageLevel.MEMORY_ONLY)
  .map{ case (k,v) => JsonParser.parse(v).extract[MonthlyCommits]}
  .saveToCassandra("githubstats","monthly_commits")


HELENA EDELSON
Senior Software Engineer,  DSE Analytics 

  

On Mar 5, 2015, at 9:33 AM, Ted Yu  wrote:

> Cui:
> You can check messages.partitions.size to determine whether messages is an 
> empty RDD.
> 
> Cheers
> 
> On Thu, Mar 5, 2015 at 12:52 AM, Akhil Das  wrote:
> When you use KafkaUtils.createStream with StringDecoders, it will return 
> String objects inside your messages stream. To access the elements from the 
> json, you could do something like the following:
> 
> 
>val mapStream = messages.map(x=> {
>   val mapper = new ObjectMapper() with ScalaObjectMapper
>   mapper.registerModule(DefaultScalaModule)
> 
>   mapper.readValue[Map[String,Any]](x).get("time")
> })
> 
>   
> 
> Thanks
> Best Regards
> 
> On Thu, Mar 5, 2015 at 11:13 AM, Cui Lin  wrote:
> Friends,
> 
> I'm trying to parse json formatted Kafka messages and then send back to 
> cassandra.I have two problems:
> I got the exception below. How to check an empty RDD?
> Exception in thread "main" java.lang.UnsupportedOperationException: empty 
> collection
> at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
> at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.reduce(RDD.scala:869)
> at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57)
> at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232)
> at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204)
> 
> val messages = KafkaUtils.createStream[String, String, StringDecoder, 
> StringDecoder](…)
> messages.foreachRDD { rdd =>
>   val message:RDD[String] = rdd.map { y => y._2 }
>   sqlContext.jsonRDD(message).registerTempTable("tempTable")
>   sqlContext.sql("SELECT time,To FROM tempTable")
> .saveToCassandra(cassandra_keyspace, cassandra_table, SomeColumns("key", 
> "msg"))
> }
> 
> 2. how to get all column names from json messages? I have hundreds of columns 
> in the json formatted message. 
> 
> Thanks for your help!
> 
> 
> 
> 
> Best regards,
> 
> Cui Lin
> 
> 



Spark Build with Hadoop 2.6, yarn - encounter java.lang.NoClassDefFoundError: org/codehaus/jackson/map/deser/std/StdDeserializer

2015-03-05 Thread Todd Nist
I am running Spark on a HortonWorks HDP Cluster. I have deployed there
prebuilt version but it is only for Spark 1.2.0 not 1.2.1 and there are a
few fixes and features in there that I would like to leverage.

I just downloaded the spark-1.2.1 source and built it to support Hadoop 2.6
by doing the following:

radtech:spark-1.2.1 tnist$ ./make-distribution.sh --name hadoop2.6
--tgz -Pyarn -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive
-Phive-thriftserver -DskipTests clean package

When I deploy this to my hadoop cluster and kick of a spark-shell,

$> spark-1.2.1-bin-hadoop2.6]# ./bin/spark-shell --master yarn-client
--driver-memory 512m --executor-memory 512m

Results in  java.lang.NoClassDefFoundError:
org/codehaus/jackson/map/deser/std/StdDeserializer

The full stack trace is below. I have validate that the
$SPARK_HOME/lib/spark-assembly-1.2.1-hadoop2.6.0.jar does infact contain
the class in question:

jar -tvf spark-assembly-1.2.1-hadoop2.6.0.jar | grep
'org/codehaus/jackson/map/deser/std'
...
 18002 Thu Mar 05 11:23:04 EST 2015
parquet/org/codehaus/jackson/map/deser/std/StdDeserializer.class
  1584 Thu Mar 05 11:23:04 EST 2015
parquet/org/codehaus/jackson/map/deser/std/StdKeyDeserializer$BoolKD.class...

Any guidance on what I missed ? If i start the spark-shell in standalone it
comes up fine, $SPARK_HOME/bin/spark-shell so it looks to be related to
starting it under yarn from what I can tell.

TIA for the assistance.

-Todd
Stack Trace

15/03/05 12:12:38 INFO spark.SecurityManager: Changing view acls to:
root15/03/05 12:12:38 INFO spark.SecurityManager: Changing modify acls
to: root15/03/05 12:12:38 INFO spark.SecurityManager: SecurityManager:
authentication disabled; ui acls disabled; users with view
permissions: Set(root); users with modify permissions:
Set(root)15/03/05 12:12:38 INFO spark.HttpServer: Starting HTTP
Server15/03/05 12:12:39 INFO server.Server:
jetty-8.y.z-SNAPSHOT15/03/05 12:12:39 INFO server.AbstractConnector:
Started SocketConnector@0.0.0.0:3617615/03/05 12:12:39 INFO
util.Utils: Successfully started service 'HTTP class server' on port
36176.
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.2.1
  /_/

Using Scala version 2.10.4 (OpenJDK 64-Bit Server VM, Java 1.7.0_75)
Type in expressions to have them evaluated.
Type :help for more information.15/03/05 12:12:43 INFO
spark.SecurityManager: Changing view acls to: root15/03/05 12:12:43
INFO spark.SecurityManager: Changing modify acls to: root15/03/05
12:12:43 INFO spark.SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users with view permissions: Set(root);
users with modify permissions: Set(root)15/03/05 12:12:44 INFO
slf4j.Slf4jLogger: Slf4jLogger started15/03/05 12:12:44 INFO Remoting:
Starting remoting15/03/05 12:12:44 INFO Remoting: Remoting started;
listening on addresses
:[akka.tcp://sparkdri...@hadoopdev01.opsdatastore.com:50544]15/03/05
12:12:44 INFO util.Utils: Successfully started service 'sparkDriver'
on port 50544.15/03/05 12:12:44 INFO spark.SparkEnv: Registering
MapOutputTracker15/03/05 12:12:44 INFO spark.SparkEnv: Registering
BlockManagerMaster15/03/05 12:12:44 INFO storage.DiskBlockManager:
Created local directory at
/tmp/spark-16402794-cc1e-42d0-9f9c-99f15eaa1861/spark-118bc6af-4008-45d7-a22f-491bcd1856c015/03/05
12:12:44 INFO storage.MemoryStore: MemoryStore started with capacity
265.4 MB15/03/05 12:12:45 WARN util.NativeCodeLoader: Unable to load
native-hadoop library for your platform... using builtin-java classes
where applicable15/03/05 12:12:45 INFO spark.HttpFileServer: HTTP File
server directory is
/tmp/spark-5d7da34c-58d4-4d60-9b6a-3dce43cab39e/spark-4d65aacb-78bd-40fd-b6c0-53b47e28819915/03/05
12:12:45 INFO spark.HttpServer: Starting HTTP Server15/03/05 12:12:45
INFO server.Server: jetty-8.y.z-SNAPSHOT15/03/05 12:12:45 INFO
server.AbstractConnector: Started
SocketConnector@0.0.0.0:5645215/03/05 12:12:45 INFO util.Utils:
Successfully started service 'HTTP file server' on port 56452.15/03/05
12:12:45 INFO server.Server: jetty-8.y.z-SNAPSHOT15/03/05 12:12:45
INFO server.AbstractConnector: Started
SelectChannelConnector@0.0.0.0:404015/03/05 12:12:45 INFO util.Utils:
Successfully started service 'SparkUI' on port 4040.15/03/05 12:12:45
INFO ui.SparkUI: Started SparkUI at
http://hadoopdev01.opsdatastore.com:404015/03/05 12:12:46 INFO
impl.TimelineClientImpl: Timeline service address:
http://hadoopdev02.opsdatastore.com:8188/ws/v1/timeline/
java.lang.NoClassDefFoundError:
org/codehaus/jackson/map/deser/std/StdDeserializer
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
at java.net.URLClassLoader$1.run(URLClassLoader.ja

Re: Spark Build with Hadoop 2.6, yarn - encounter java.lang.NoClassDefFoundError: org/codehaus/jackson/map/deser/std/StdDeserializer

2015-03-05 Thread Victor Tso-Guillen
That particular class you did find is under parquet/... which means it was
shaded. Did you build your application against a hadoop2.6 dependency? The
maven central repo only has 2.2 but HDP has its own repos.

On Thu, Mar 5, 2015 at 10:04 AM, Todd Nist  wrote:

> I am running Spark on a HortonWorks HDP Cluster. I have deployed there
> prebuilt version but it is only for Spark 1.2.0 not 1.2.1 and there are a
> few fixes and features in there that I would like to leverage.
>
> I just downloaded the spark-1.2.1 source and built it to support Hadoop
> 2.6 by doing the following:
>
> radtech:spark-1.2.1 tnist$ ./make-distribution.sh --name hadoop2.6 --tgz 
> -Pyarn -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver 
> -DskipTests clean package
>
> When I deploy this to my hadoop cluster and kick of a spark-shell,
>
> $> spark-1.2.1-bin-hadoop2.6]# ./bin/spark-shell --master yarn-client 
> --driver-memory 512m --executor-memory 512m
>
> Results in  java.lang.NoClassDefFoundError:
> org/codehaus/jackson/map/deser/std/StdDeserializer
>
> The full stack trace is below. I have validate that the
> $SPARK_HOME/lib/spark-assembly-1.2.1-hadoop2.6.0.jar does infact contain
> the class in question:
>
> jar -tvf spark-assembly-1.2.1-hadoop2.6.0.jar | grep 
> 'org/codehaus/jackson/map/deser/std'
> ...
>  18002 Thu Mar 05 11:23:04 EST 2015  
> parquet/org/codehaus/jackson/map/deser/std/StdDeserializer.class
>   1584 Thu Mar 05 11:23:04 EST 2015 
> parquet/org/codehaus/jackson/map/deser/std/StdKeyDeserializer$BoolKD.class...
>
> Any guidance on what I missed ? If i start the spark-shell in standalone
> it comes up fine, $SPARK_HOME/bin/spark-shell so it looks to be related
> to starting it under yarn from what I can tell.
>
> TIA for the assistance.
>
> -Todd
> Stack Trace
>
> 15/03/05 12:12:38 INFO spark.SecurityManager: Changing view acls to: 
> root15/03/05 12:12:38 INFO spark.SecurityManager: Changing modify acls to: 
> root15/03/05 12:12:38 INFO spark.SecurityManager: SecurityManager: 
> authentication disabled; ui acls disabled; users with view permissions: 
> Set(root); users with modify permissions: Set(root)15/03/05 12:12:38 INFO 
> spark.HttpServer: Starting HTTP Server15/03/05 12:12:39 INFO server.Server: 
> jetty-8.y.z-SNAPSHOT15/03/05 12:12:39 INFO server.AbstractConnector: Started 
> SocketConnector@0.0.0.0:3617615/03/05 12:12:39 INFO util.Utils: Successfully 
> started service 'HTTP class server' on port 36176.
> Welcome to
>     __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/___/ .__/\_,_/_/ /_/\_\   version 1.2.1
>   /_/
>
> Using Scala version 2.10.4 (OpenJDK 64-Bit Server VM, Java 1.7.0_75)
> Type in expressions to have them evaluated.
> Type :help for more information.15/03/05 12:12:43 INFO spark.SecurityManager: 
> Changing view acls to: root15/03/05 12:12:43 INFO spark.SecurityManager: 
> Changing modify acls to: root15/03/05 12:12:43 INFO spark.SecurityManager: 
> SecurityManager: authentication disabled; ui acls disabled; users with view 
> permissions: Set(root); users with modify permissions: Set(root)15/03/05 
> 12:12:44 INFO slf4j.Slf4jLogger: Slf4jLogger started15/03/05 12:12:44 INFO 
> Remoting: Starting remoting15/03/05 12:12:44 INFO Remoting: Remoting started; 
> listening on addresses 
> :[akka.tcp://sparkdri...@hadoopdev01.opsdatastore.com:50544]15/03/05 12:12:44 
> INFO util.Utils: Successfully started service 'sparkDriver' on port 
> 50544.15/03/05 12:12:44 INFO spark.SparkEnv: Registering 
> MapOutputTracker15/03/05 12:12:44 INFO spark.SparkEnv: Registering 
> BlockManagerMaster15/03/05 12:12:44 INFO storage.DiskBlockManager: Created 
> local directory at 
> /tmp/spark-16402794-cc1e-42d0-9f9c-99f15eaa1861/spark-118bc6af-4008-45d7-a22f-491bcd1856c015/03/05
>  12:12:44 INFO storage.MemoryStore: MemoryStore started with capacity 265.4 
> MB15/03/05 12:12:45 WARN util.NativeCodeLoader: Unable to load native-hadoop 
> library for your platform... using builtin-java classes where 
> applicable15/03/05 12:12:45 INFO spark.HttpFileServer: HTTP File server 
> directory is 
> /tmp/spark-5d7da34c-58d4-4d60-9b6a-3dce43cab39e/spark-4d65aacb-78bd-40fd-b6c0-53b47e28819915/03/05
>  12:12:45 INFO spark.HttpServer: Starting HTTP Server15/03/05 12:12:45 INFO 
> server.Server: jetty-8.y.z-SNAPSHOT15/03/05 12:12:45 INFO 
> server.AbstractConnector: Started SocketConnector@0.0.0.0:5645215/03/05 
> 12:12:45 INFO util.Utils: Successfully started service 'HTTP file server' on 
> port 56452.15/03/05 12:12:45 INFO server.Server: jetty-8.y.z-SNAPSHOT15/03/05 
> 12:12:45 INFO server.AbstractConnector: Started 
> SelectChannelConnector@0.0.0.0:404015/03/05 12:12:45 INFO util.Utils: 
> Successfully started service 'SparkUI' on port 4040.15/03/05 12:12:45 INFO 
> ui.SparkUI: Started SparkUI at 
> http://hadoopdev01.opsdatastore.com:404015/03/05 12:12:46 INFO 
> impl.TimelineClientImpl: Timeline service address: 
> http://hadoopdev02.opsdatastore.com:8188/ws/v1/t

Re: Spark Build with Hadoop 2.6, yarn - encounter java.lang.NoClassDefFoundError: org/codehaus/jackson/map/deser/std/StdDeserializer

2015-03-05 Thread Ted Yu
Please add the following to build command:
-Djackson.version=1.9.3

Cheers

On Thu, Mar 5, 2015 at 10:04 AM, Todd Nist  wrote:

> I am running Spark on a HortonWorks HDP Cluster. I have deployed there
> prebuilt version but it is only for Spark 1.2.0 not 1.2.1 and there are a
> few fixes and features in there that I would like to leverage.
>
> I just downloaded the spark-1.2.1 source and built it to support Hadoop
> 2.6 by doing the following:
>
> radtech:spark-1.2.1 tnist$ ./make-distribution.sh --name hadoop2.6 --tgz 
> -Pyarn -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver 
> -DskipTests clean package
>
> When I deploy this to my hadoop cluster and kick of a spark-shell,
>
> $> spark-1.2.1-bin-hadoop2.6]# ./bin/spark-shell --master yarn-client 
> --driver-memory 512m --executor-memory 512m
>
> Results in  java.lang.NoClassDefFoundError:
> org/codehaus/jackson/map/deser/std/StdDeserializer
>
> The full stack trace is below. I have validate that the
> $SPARK_HOME/lib/spark-assembly-1.2.1-hadoop2.6.0.jar does infact contain
> the class in question:
>
> jar -tvf spark-assembly-1.2.1-hadoop2.6.0.jar | grep 
> 'org/codehaus/jackson/map/deser/std'
> ...
>  18002 Thu Mar 05 11:23:04 EST 2015  
> parquet/org/codehaus/jackson/map/deser/std/StdDeserializer.class
>   1584 Thu Mar 05 11:23:04 EST 2015 
> parquet/org/codehaus/jackson/map/deser/std/StdKeyDeserializer$BoolKD.class...
>
> Any guidance on what I missed ? If i start the spark-shell in standalone
> it comes up fine, $SPARK_HOME/bin/spark-shell so it looks to be related
> to starting it under yarn from what I can tell.
>
> TIA for the assistance.
>
> -Todd
> Stack Trace
>
> 15/03/05 12:12:38 INFO spark.SecurityManager: Changing view acls to: 
> root15/03/05 12:12:38 INFO spark.SecurityManager: Changing modify acls to: 
> root15/03/05 12:12:38 INFO spark.SecurityManager: SecurityManager: 
> authentication disabled; ui acls disabled; users with view permissions: 
> Set(root); users with modify permissions: Set(root)15/03/05 12:12:38 INFO 
> spark.HttpServer: Starting HTTP Server15/03/05 12:12:39 INFO server.Server: 
> jetty-8.y.z-SNAPSHOT15/03/05 12:12:39 INFO server.AbstractConnector: Started 
> SocketConnector@0.0.0.0:3617615/03/05 12:12:39 INFO util.Utils: Successfully 
> started service 'HTTP class server' on port 36176.
> Welcome to
>     __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/___/ .__/\_,_/_/ /_/\_\   version 1.2.1
>   /_/
>
> Using Scala version 2.10.4 (OpenJDK 64-Bit Server VM, Java 1.7.0_75)
> Type in expressions to have them evaluated.
> Type :help for more information.15/03/05 12:12:43 INFO spark.SecurityManager: 
> Changing view acls to: root15/03/05 12:12:43 INFO spark.SecurityManager: 
> Changing modify acls to: root15/03/05 12:12:43 INFO spark.SecurityManager: 
> SecurityManager: authentication disabled; ui acls disabled; users with view 
> permissions: Set(root); users with modify permissions: Set(root)15/03/05 
> 12:12:44 INFO slf4j.Slf4jLogger: Slf4jLogger started15/03/05 12:12:44 INFO 
> Remoting: Starting remoting15/03/05 12:12:44 INFO Remoting: Remoting started; 
> listening on addresses 
> :[akka.tcp://sparkdri...@hadoopdev01.opsdatastore.com:50544]15/03/05 12:12:44 
> INFO util.Utils: Successfully started service 'sparkDriver' on port 
> 50544.15/03/05 12:12:44 INFO spark.SparkEnv: Registering 
> MapOutputTracker15/03/05 12:12:44 INFO spark.SparkEnv: Registering 
> BlockManagerMaster15/03/05 12:12:44 INFO storage.DiskBlockManager: Created 
> local directory at 
> /tmp/spark-16402794-cc1e-42d0-9f9c-99f15eaa1861/spark-118bc6af-4008-45d7-a22f-491bcd1856c015/03/05
>  12:12:44 INFO storage.MemoryStore: MemoryStore started with capacity 265.4 
> MB15/03/05 12:12:45 WARN util.NativeCodeLoader: Unable to load native-hadoop 
> library for your platform... using builtin-java classes where 
> applicable15/03/05 12:12:45 INFO spark.HttpFileServer: HTTP File server 
> directory is 
> /tmp/spark-5d7da34c-58d4-4d60-9b6a-3dce43cab39e/spark-4d65aacb-78bd-40fd-b6c0-53b47e28819915/03/05
>  12:12:45 INFO spark.HttpServer: Starting HTTP Server15/03/05 12:12:45 INFO 
> server.Server: jetty-8.y.z-SNAPSHOT15/03/05 12:12:45 INFO 
> server.AbstractConnector: Started SocketConnector@0.0.0.0:5645215/03/05 
> 12:12:45 INFO util.Utils: Successfully started service 'HTTP file server' on 
> port 56452.15/03/05 12:12:45 INFO server.Server: jetty-8.y.z-SNAPSHOT15/03/05 
> 12:12:45 INFO server.AbstractConnector: Started 
> SelectChannelConnector@0.0.0.0:404015/03/05 12:12:45 INFO util.Utils: 
> Successfully started service 'SparkUI' on port 4040.15/03/05 12:12:45 INFO 
> ui.SparkUI: Started SparkUI at 
> http://hadoopdev01.opsdatastore.com:404015/03/05 12:12:46 INFO 
> impl.TimelineClientImpl: Timeline service address: 
> http://hadoopdev02.opsdatastore.com:8188/ws/v1/timeline/
> java.lang.NoClassDefFoundError: 
> org/codehaus/jackson/map/deser/std/StdDeserializer
> at java.lang.ClassLoader.define

Re: Spark Build with Hadoop 2.6, yarn - encounter java.lang.NoClassDefFoundError: org/codehaus/jackson/map/deser/std/StdDeserializer

2015-03-05 Thread Sean Owen
Jackson 1.9.13? and codehaus.jackson.version? that's already set by
the profile hadoop-2.4.

On Thu, Mar 5, 2015 at 6:13 PM, Ted Yu  wrote:
> Please add the following to build command:
> -Djackson.version=1.9.3
>
> Cheers
>
> On Thu, Mar 5, 2015 at 10:04 AM, Todd Nist  wrote:
>>
>> I am running Spark on a HortonWorks HDP Cluster. I have deployed there
>> prebuilt version but it is only for Spark 1.2.0 not 1.2.1 and there are a
>> few fixes and features in there that I would like to leverage.
>>
>> I just downloaded the spark-1.2.1 source and built it to support Hadoop
>> 2.6 by doing the following:
>>
>> radtech:spark-1.2.1 tnist$ ./make-distribution.sh --name hadoop2.6 --tgz
>> -Pyarn -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver
>> -DskipTests clean package
>>
>> When I deploy this to my hadoop cluster and kick of a spark-shell,
>>
>> $> spark-1.2.1-bin-hadoop2.6]# ./bin/spark-shell --master yarn-client
>> --driver-memory 512m --executor-memory 512m
>>
>> Results in  java.lang.NoClassDefFoundError:
>> org/codehaus/jackson/map/deser/std/StdDeserializer
>>
>> The full stack trace is below. I have validate that the
>> $SPARK_HOME/lib/spark-assembly-1.2.1-hadoop2.6.0.jar does infact contain the
>> class in question:
>>
>> jar -tvf spark-assembly-1.2.1-hadoop2.6.0.jar | grep
>> 'org/codehaus/jackson/map/deser/std'
>>
>> ...
>>  18002 Thu Mar 05 11:23:04 EST 2015
>> parquet/org/codehaus/jackson/map/deser/std/StdDeserializer.class
>>   1584 Thu Mar 05 11:23:04 EST 2015
>> parquet/org/codehaus/jackson/map/deser/std/StdKeyDeserializer$BoolKD.class
>> ...
>>
>> Any guidance on what I missed ? If i start the spark-shell in standalone
>> it comes up fine, $SPARK_HOME/bin/spark-shell so it looks to be related to
>> starting it under yarn from what I can tell.
>>
>> TIA for the assistance.
>>
>> -Todd
>>
>> Stack Trace
>>
>> 15/03/05 12:12:38 INFO spark.SecurityManager: Changing view acls to: root
>> 15/03/05 12:12:38 INFO spark.SecurityManager: Changing modify acls to:
>> root
>> 15/03/05 12:12:38 INFO spark.SecurityManager: SecurityManager:
>> authentication disabled; ui acls disabled; users with view permissions:
>> Set(root); users with modify permissions: Set(root)
>> 15/03/05 12:12:38 INFO spark.HttpServer: Starting HTTP Server
>> 15/03/05 12:12:39 INFO server.Server: jetty-8.y.z-SNAPSHOT
>> 15/03/05 12:12:39 INFO server.AbstractConnector: Started
>> SocketConnector@0.0.0.0:36176
>> 15/03/05 12:12:39 INFO util.Utils: Successfully started service 'HTTP
>> class server' on port 36176.
>> Welcome to
>>     __
>>  / __/__  ___ _/ /__
>> _\ \/ _ \/ _ `/ __/  '_/
>>/___/ .__/\_,_/_/ /_/\_\   version 1.2.1
>>   /_/
>>
>> Using Scala version 2.10.4 (OpenJDK 64-Bit Server VM, Java 1.7.0_75)
>> Type in expressions to have them evaluated.
>> Type :help for more information.
>> 15/03/05 12:12:43 INFO spark.SecurityManager: Changing view acls to: root
>> 15/03/05 12:12:43 INFO spark.SecurityManager: Changing modify acls to:
>> root
>> 15/03/05 12:12:43 INFO spark.SecurityManager: SecurityManager:
>> authentication disabled; ui acls disabled; users with view permissions:
>> Set(root); users with modify permissions: Set(root)
>> 15/03/05 12:12:44 INFO slf4j.Slf4jLogger: Slf4jLogger started
>> 15/03/05 12:12:44 INFO Remoting: Starting remoting
>> 15/03/05 12:12:44 INFO Remoting: Remoting started; listening on addresses
>> :[akka.tcp://sparkdri...@hadoopdev01.opsdatastore.com:50544]
>> 15/03/05 12:12:44 INFO util.Utils: Successfully started service
>> 'sparkDriver' on port 50544.
>> 15/03/05 12:12:44 INFO spark.SparkEnv: Registering MapOutputTracker
>> 15/03/05 12:12:44 INFO spark.SparkEnv: Registering BlockManagerMaster
>> 15/03/05 12:12:44 INFO storage.DiskBlockManager: Created local directory
>> at
>> /tmp/spark-16402794-cc1e-42d0-9f9c-99f15eaa1861/spark-118bc6af-4008-45d7-a22f-491bcd1856c0
>> 15/03/05 12:12:44 INFO storage.MemoryStore: MemoryStore started with
>> capacity 265.4 MB
>> 15/03/05 12:12:45 WARN util.NativeCodeLoader: Unable to load native-hadoop
>> library for your platform... using builtin-java classes where applicable
>> 15/03/05 12:12:45 INFO spark.HttpFileServer: HTTP File server directory is
>> /tmp/spark-5d7da34c-58d4-4d60-9b6a-3dce43cab39e/spark-4d65aacb-78bd-40fd-b6c0-53b47e288199
>> 15/03/05 12:12:45 INFO spark.HttpServer: Starting HTTP Server
>> 15/03/05 12:12:45 INFO server.Server: jetty-8.y.z-SNAPSHOT
>> 15/03/05 12:12:45 INFO server.AbstractConnector: Started
>> SocketConnector@0.0.0.0:56452
>> 15/03/05 12:12:45 INFO util.Utils: Successfully started service 'HTTP file
>> server' on port 56452.
>> 15/03/05 12:12:45 INFO server.Server: jetty-8.y.z-SNAPSHOT
>> 15/03/05 12:12:45 INFO server.AbstractConnector: Started
>> SelectChannelConnector@0.0.0.0:4040
>> 15/03/05 12:12:45 INFO util.Utils: Successfully started service 'SparkUI'
>> on port 4040.
>> 15/03/05 12:12:45 INFO ui.SparkUI: Started SparkUI at
>> http://hadoopdev01.opsdatastore.com:4040
>

IncompatibleClassChangeError

2015-03-05 Thread ey-chih chow
Hi,

I am using CDH5.3.2 now for a Spark project.  I got the following exception:

java.lang.IncompatibleClassChangeError: Found interface
org.apache.hadoop.mapreduce.TaskAttemptContext, but class was expected

I used all the CDH5.3.2 jar files in my pom file to generate the application
jar file.  What else I should do to fix the problem?

Ey-Chih Chow



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/IncompatibleClassChangeError-tp21934.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark v1.2.1 failing under BigTop build in External Flume Sink (due to missing Netty library)

2015-03-05 Thread Kelly, Jonathan
I'm running into an issue building Spark v1.2.1 (as well as the latest in 
branch-1.2 and v1.3.0-rc2 and the latest in branch-1.3) with BigTop (v0.9, 
which is not quite released yet).  The build fails in the External Flume Sink 
subproject with the following error:

[INFO] Compiling 5 Scala sources and 3 Java sources to 
/workspace/workspace/bigtop.spark-rpm/build/spark/rpm/BUILD/spark-1.3.0/external/flume-sink/target/scala-2.10/classes...
[WARNING] Class org.jboss.netty.channel.ChannelFactory not found - continuing 
with a stub.
[ERROR] error while loading NettyServer, class file 
'/home/ec2-user/.m2/repository/org/apache/avro/avro-ipc/1.7.6/avro-ipc-1.7.6.jar(org/apache/avro/ipc/NettyServer.class)'
 is broken
(class java.lang.NullPointerException/null)
[WARNING] one warning found
[ERROR] one error found

It seems like what is happening is that the Netty library is missing at build 
time, which happens because it is explicitly excluded in the pom.xml (see 
https://github.com/apache/spark/blob/v1.2.1/external/flume-sink/pom.xml#L42).  
I attempted removing the exclusions and the explicit re-add for the test scope 
on lines 77-88, and that allowed the build to succeed, though I don't know if 
that will cause problems at runtime.  I don't have any experience with the 
Flume Sink, so I don't really know how to test it.  (And, to be clear, I'm not 
necessarily trying to get the Flume Sink to work-- I just want the project to 
build successfully, though of course I'd still want the Flume Sink to work for 
whomever does need it.)

Does anybody have any idea what's going on here?  Here is the command BigTop is 
running to build Spark:

mvn -Pbigtop-dist -Pyarn -Phive -Phive-thriftserver -Pkinesis-asl 
-Divy.home=/home/ec2-user/.ivy2 -Dsbt.ivy.home=/home/ec2-user/.ivy2 
-Duser.home=/home/ec2-user -Drepo.maven.org= 
-Dreactor.repo=file:///home/ec2-user/.m2/repository 
-Dhadoop.version=2.4.0-amzn-3-SNAPSHOT -Dyarn.version=2.4.0-amzn-3-SNAPSHOT 
-Dprotobuf.version=2.5.0 -Dscala.version=2.10.3 -Dscala.binary.version=2.10 
-DskipTests -DrecompileMode=all install

As I mentioned above, if I switch to the latest in branch-1.2, to v1.3.0-rc2, 
or to the latest in branch-1.3, I get the same exact error.  I was not getting 
the error with Spark v1.1.0, though there weren't any changes to the 
external/flume-sink/pom.xml between v1.1.0 and v1.2.1.

~ Jonathan Kelly


Re: Spark Build with Hadoop 2.6, yarn - encounter java.lang.NoClassDefFoundError: org/codehaus/jackson/map/deser/std/StdDeserializer

2015-03-05 Thread Todd Nist
@Victor,

I'm pretty sure I built it correctly, I specified -Dhadoop.version=2.6.0,
am I missing something here?  Followed the docs on this but I'm open to
suggestions.

make-distribution.sh --name hadoop2.6 --tgz -Pyarn -Phadoop-2.4
*-Dhadoop.version=2.6.0* -Phive -Phive-thriftserver -DskipTests clean
package

@Ted
Well it is building now with the Djackson.version=1.9.3, can update in a
few on if it works.

@Sean
Since it in the process of building I will let it finish and try it out,
but do you see any other possible issues with the approach I have taken?

Thanks all for the quick responses.

-Todd

On Thu, Mar 5, 2015 at 1:20 PM, Sean Owen  wrote:

> Jackson 1.9.13? and codehaus.jackson.version? that's already set by
> the profile hadoop-2.4.
>
> On Thu, Mar 5, 2015 at 6:13 PM, Ted Yu  wrote:
> > Please add the following to build command:
> > -Djackson.version=1.9.3
> >
> > Cheers
> >
> > On Thu, Mar 5, 2015 at 10:04 AM, Todd Nist  wrote:
> >>
> >> I am running Spark on a HortonWorks HDP Cluster. I have deployed there
> >> prebuilt version but it is only for Spark 1.2.0 not 1.2.1 and there are
> a
> >> few fixes and features in there that I would like to leverage.
> >>
> >> I just downloaded the spark-1.2.1 source and built it to support Hadoop
> >> 2.6 by doing the following:
> >>
> >> radtech:spark-1.2.1 tnist$ ./make-distribution.sh --name hadoop2.6 --tgz
> >> -Pyarn -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver
> >> -DskipTests clean package
> >>
> >> When I deploy this to my hadoop cluster and kick of a spark-shell,
> >>
> >> $> spark-1.2.1-bin-hadoop2.6]# ./bin/spark-shell --master yarn-client
> >> --driver-memory 512m --executor-memory 512m
> >>
> >> Results in  java.lang.NoClassDefFoundError:
> >> org/codehaus/jackson/map/deser/std/StdDeserializer
> >>
> >> The full stack trace is below. I have validate that the
> >> $SPARK_HOME/lib/spark-assembly-1.2.1-hadoop2.6.0.jar does infact
> contain the
> >> class in question:
> >>
> >> jar -tvf spark-assembly-1.2.1-hadoop2.6.0.jar | grep
> >> 'org/codehaus/jackson/map/deser/std'
> >>
> >> ...
> >>  18002 Thu Mar 05 11:23:04 EST 2015
> >> parquet/org/codehaus/jackson/map/deser/std/StdDeserializer.class
> >>   1584 Thu Mar 05 11:23:04 EST 2015
> >>
> parquet/org/codehaus/jackson/map/deser/std/StdKeyDeserializer$BoolKD.class
> >> ...
> >>
> >> Any guidance on what I missed ? If i start the spark-shell in standalone
> >> it comes up fine, $SPARK_HOME/bin/spark-shell so it looks to be related
> to
> >> starting it under yarn from what I can tell.
> >>
> >> TIA for the assistance.
> >>
> >> -Todd
> >>
> >> Stack Trace
> >>
> >> 15/03/05 12:12:38 INFO spark.SecurityManager: Changing view acls to:
> root
> >> 15/03/05 12:12:38 INFO spark.SecurityManager: Changing modify acls to:
> >> root
> >> 15/03/05 12:12:38 INFO spark.SecurityManager: SecurityManager:
> >> authentication disabled; ui acls disabled; users with view permissions:
> >> Set(root); users with modify permissions: Set(root)
> >> 15/03/05 12:12:38 INFO spark.HttpServer: Starting HTTP Server
> >> 15/03/05 12:12:39 INFO server.Server: jetty-8.y.z-SNAPSHOT
> >> 15/03/05 12:12:39 INFO server.AbstractConnector: Started
> >> SocketConnector@0.0.0.0:36176
> >> 15/03/05 12:12:39 INFO util.Utils: Successfully started service 'HTTP
> >> class server' on port 36176.
> >> Welcome to
> >>     __
> >>  / __/__  ___ _/ /__
> >> _\ \/ _ \/ _ `/ __/  '_/
> >>/___/ .__/\_,_/_/ /_/\_\   version 1.2.1
> >>   /_/
> >>
> >> Using Scala version 2.10.4 (OpenJDK 64-Bit Server VM, Java 1.7.0_75)
> >> Type in expressions to have them evaluated.
> >> Type :help for more information.
> >> 15/03/05 12:12:43 INFO spark.SecurityManager: Changing view acls to:
> root
> >> 15/03/05 12:12:43 INFO spark.SecurityManager: Changing modify acls to:
> >> root
> >> 15/03/05 12:12:43 INFO spark.SecurityManager: SecurityManager:
> >> authentication disabled; ui acls disabled; users with view permissions:
> >> Set(root); users with modify permissions: Set(root)
> >> 15/03/05 12:12:44 INFO slf4j.Slf4jLogger: Slf4jLogger started
> >> 15/03/05 12:12:44 INFO Remoting: Starting remoting
> >> 15/03/05 12:12:44 INFO Remoting: Remoting started; listening on
> addresses
> >> :[akka.tcp://sparkdri...@hadoopdev01.opsdatastore.com:50544]
> >> 15/03/05 12:12:44 INFO util.Utils: Successfully started service
> >> 'sparkDriver' on port 50544.
> >> 15/03/05 12:12:44 INFO spark.SparkEnv: Registering MapOutputTracker
> >> 15/03/05 12:12:44 INFO spark.SparkEnv: Registering BlockManagerMaster
> >> 15/03/05 12:12:44 INFO storage.DiskBlockManager: Created local directory
> >> at
> >>
> /tmp/spark-16402794-cc1e-42d0-9f9c-99f15eaa1861/spark-118bc6af-4008-45d7-a22f-491bcd1856c0
> >> 15/03/05 12:12:44 INFO storage.MemoryStore: MemoryStore started with
> >> capacity 265.4 MB
> >> 15/03/05 12:12:45 WARN util.NativeCodeLoader: Unable to load
> native-hadoop
> >> library for your platform... using builtin-java classes

Re: IncompatibleClassChangeError

2015-03-05 Thread M. Dale
In Hadoop 1.x TaskAttemptContext is a class (for example, 
https://hadoop.apache.org/docs/r1.2.1/api/org/apache/hadoop/mapred/TaskAttemptContext.html)


In Hadoop 2.x TaskAttemptContext is an interface 
(https://hadoop.apache.org/docs/r2.4.0/api/org/apache/hadoop/mapreduce/TaskAttemptContext.html)


From 
http://www.cloudera.com/content/cloudera/en/documentation/core/latest/topics/cdh_vd_cdh_package_tarball.html 
it looks like

CDH 5.3.2 uses Hadoop 2.5.

Are you using any third party libraries that come in hadoop1 (default) vs.
hadoop2 versions like avro-mapred (see 
https://issues.apache.org/jira/browse/SPARK-3039)?


If so make sure you include:


  ...
  hadoop2


What version of Spark are you using? Are you using Avro? If so 
SPARK-3039 is fixed in Spark 1.3.


Markus

On 03/05/2015 01:31 PM, ey-chih chow wrote:

Hi,

I am using CDH5.3.2 now for a Spark project.  I got the following exception:

java.lang.IncompatibleClassChangeError: Found interface
org.apache.hadoop.mapreduce.TaskAttemptContext, but class was expected

I used all the CDH5.3.2 jar files in my pom file to generate the application
jar file.  What else I should do to fix the problem?

Ey-Chih Chow



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/IncompatibleClassChangeError-tp21934.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Partitioning Dataset and Using Reduce in Apache Spark

2015-03-05 Thread Daniel Siegmann
An RDD is a Resilient *Distributed* Data set. The partitioning and
distribution of the data happens in the background. You'll occasionally
need to concern yourself with it (especially to get good performance), but
from an API perspective it's mostly invisible (some methods do allow you to
specify a number of partitions).

When you call sc.textFile(myPath) or similar, you get an RDD. That RDD will
be composed of a bunch of partitions, but you don't really need to worry
about that. The partitioning will be based on how the data is stored. When
you call a method that causes a shuffle (such as reduce), the data is
repartitioned into a number of partitions based on your default parallelism
setting (which IIRC is based on your number of cores if you haven't set it
explicitly).

When you call reduce and similar methods, each partition can be reduced in
parallel. Then the results of that can be transferred across the network
and reduced to the final result. *You supply the function and Spark handles
the parallel execution of that function*.

I hope this helps clear up your misconceptions. You might also want to
familiarize yourself with the collections API in Java 8 (or Scala, or
Python, or pretty much any other language with lambda expressions), since
RDDs are meant to have an API that feels similar.

On Thu, Mar 5, 2015 at 9:45 AM, raggy  wrote:

> I am trying to use Apache spark to load up a file, and distribute the file
> to
> several nodes in my cluster and then aggregate the results and obtain them.
> I don't quite understand how to do this.
>
> From my understanding the reduce action enables Spark to combine the
> results
> from different nodes and aggregate them together. Am I understanding this
> correctly?
>
> From a programming perspective, I don't understand how I would code this
> reduce function.
>
> How exactly do I partition the main dataset into N pieces and ask them to
> be
> parallel processed by using a list of transformations?
>
> reduce is supposed to take in two elements and a function for combining
> them. Are these 2 elements supposed to be RDDs from the context of Spark or
> can they be any type of element? Also, if you have N different partitions
> running parallel, how would reduce aggregate all their results into one
> final result(since the reduce function aggregates only 2 elements)?
>
> Also, I don't understand this example. The example from the spark website
> uses reduce, but I don't see the data being processed in parallel. So, what
> is the point of the reduce? If I could get a detailed explanation of the
> loop in this example, I think that would clear up most of my questions.
>
> class ComputeGradient extends Function {
>   private Vector w;
>   ComputeGradient(Vector w) { this.w = w; }
>   public Vector call(DataPoint p) {
> return p.x.times(p.y * (1 / (1 + Math.exp(w.dot(p.x))) - 1));
>   }
> }
>
> JavaRDD points = spark.textFile(...).map(new
> ParsePoint()).cache();
> Vector w = Vector.random(D); // current separating plane
> for (int i = 0; i < ITERATIONS; i++) {
>   Vector gradient = points.map(new ComputeGradient(w)).reduce(new
> AddVectors());
>   w = w.subtract(gradient);
> }
> System.out.println("Final separating plane: " + w);
>
> Also, I have been trying to find the source code for reduce from the Apache
> Spark Github, but the source is pretty huge and I haven't been able to
> pinpoint it. Could someone please direct me towards which file I could find
> it in?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Partitioning-Dataset-and-Using-Reduce-in-Apache-Spark-tp21933.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark Build with Hadoop 2.6, yarn - encounter java.lang.NoClassDefFoundError: org/codehaus/jackson/map/deser/std/StdDeserializer

2015-03-05 Thread Marcelo Vanzin
It seems from the excerpt below that your cluster is set up to use the
Yarn ATS, and the code is failing in that path. I think you'll need to
apply the following patch to your Spark sources if you want this to
work:

https://github.com/apache/spark/pull/3938

On Thu, Mar 5, 2015 at 10:04 AM, Todd Nist  wrote:
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.serviceInit(YarnClientImpl.java:166)
> at
> org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
> at
> org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:65)
> at
> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:57)
> at
> org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:140)
> at org.apache.spark.SparkContext.(SparkContext.scala:348)

-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Training Random Forest

2015-03-05 Thread drarse
I am testing the Random Forest in Spark, but I have a question... If I train
for the second time, will update the decision trees created or these are
created anew ?. That is, does the system will continually learning for each
dataset or only the first?

Thanks for everything



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Training-Random-Forest-tp21935.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark v1.2.1 failing under BigTop build in External Flume Sink (due to missing Netty library)

2015-03-05 Thread Kelly, Jonathan
I confirmed that this has nothing to do with BigTop by running the same mvn 
command directly in a fresh clone of the Spark package at the v1.2.1 tag.  I 
got the same exact error.

Jonathan Kelly
Elastic MapReduce - SDE
Port 99 (SEA35) 08.220.C2

From: , Jonathan Kelly mailto:jonat...@amazon.com>>
Date: Thursday, March 5, 2015 at 10:39 AM
To: "user@spark.apache.org" 
mailto:user@spark.apache.org>>
Subject: Spark v1.2.1 failing under BigTop build in External Flume Sink (due to 
missing Netty library)

I'm running into an issue building Spark v1.2.1 (as well as the latest in 
branch-1.2 and v1.3.0-rc2 and the latest in branch-1.3) with BigTop (v0.9, 
which is not quite released yet).  The build fails in the External Flume Sink 
subproject with the following error:

[INFO] Compiling 5 Scala sources and 3 Java sources to 
/workspace/workspace/bigtop.spark-rpm/build/spark/rpm/BUILD/spark-1.3.0/external/flume-sink/target/scala-2.10/classes...
[WARNING] Class org.jboss.netty.channel.ChannelFactory not found - continuing 
with a stub.
[ERROR] error while loading NettyServer, class file 
'/home/ec2-user/.m2/repository/org/apache/avro/avro-ipc/1.7.6/avro-ipc-1.7.6.jar(org/apache/avro/ipc/NettyServer.class)'
 is broken
(class java.lang.NullPointerException/null)
[WARNING] one warning found
[ERROR] one error found

It seems like what is happening is that the Netty library is missing at build 
time, which happens because it is explicitly excluded in the pom.xml (see 
https://github.com/apache/spark/blob/v1.2.1/external/flume-sink/pom.xml#L42).  
I attempted removing the exclusions and the explicit re-add for the test scope 
on lines 77-88, and that allowed the build to succeed, though I don't know if 
that will cause problems at runtime.  I don't have any experience with the 
Flume Sink, so I don't really know how to test it.  (And, to be clear, I'm not 
necessarily trying to get the Flume Sink to work-- I just want the project to 
build successfully, though of course I'd still want the Flume Sink to work for 
whomever does need it.)

Does anybody have any idea what's going on here?  Here is the command BigTop is 
running to build Spark:

mvn -Pbigtop-dist -Pyarn -Phive -Phive-thriftserver -Pkinesis-asl 
-Divy.home=/home/ec2-user/.ivy2 -Dsbt.ivy.home=/home/ec2-user/.ivy2 
-Duser.home=/home/ec2-user -Drepo.maven.org= 
-Dreactor.repo=file:///home/ec2-user/.m2/repository 
-Dhadoop.version=2.4.0-amzn-3-SNAPSHOT -Dyarn.version=2.4.0-amzn-3-SNAPSHOT 
-Dprotobuf.version=2.5.0 -Dscala.version=2.10.3 -Dscala.binary.version=2.10 
-DskipTests -DrecompileMode=all install

As I mentioned above, if I switch to the latest in branch-1.2, to v1.3.0-rc2, 
or to the latest in branch-1.3, I get the same exact error.  I was not getting 
the error with Spark v1.1.0, though there weren't any changes to the 
external/flume-sink/pom.xml between v1.1.0 and v1.2.1.

~ Jonathan Kelly


Re: Spark v1.2.1 failing under BigTop build in External Flume Sink (due to missing Netty library)

2015-03-05 Thread Patrick Wendell
You may need to add the -Phadoop-2.4 profile. When building or release
packages for Hadoop 2.4 we use the following flags:

-Phadoop-2.4 -Phive -Phive-thriftserver -Pyarn

- Patrick

On Thu, Mar 5, 2015 at 12:47 PM, Kelly, Jonathan  wrote:
> I confirmed that this has nothing to do with BigTop by running the same mvn
> command directly in a fresh clone of the Spark package at the v1.2.1 tag.  I
> got the same exact error.
>
>
> Jonathan Kelly
>
> Elastic MapReduce - SDE
>
> Port 99 (SEA35) 08.220.C2
>
>
> From: , Jonathan Kelly 
> Date: Thursday, March 5, 2015 at 10:39 AM
> To: "user@spark.apache.org" 
> Subject: Spark v1.2.1 failing under BigTop build in External Flume Sink (due
> to missing Netty library)
>
> I'm running into an issue building Spark v1.2.1 (as well as the latest in
> branch-1.2 and v1.3.0-rc2 and the latest in branch-1.3) with BigTop (v0.9,
> which is not quite released yet).  The build fails in the External Flume
> Sink subproject with the following error:
>
> [INFO] Compiling 5 Scala sources and 3 Java sources to
> /workspace/workspace/bigtop.spark-rpm/build/spark/rpm/BUILD/spark-1.3.0/external/flume-sink/target/scala-2.10/classes...
> [WARNING] Class org.jboss.netty.channel.ChannelFactory not found -
> continuing with a stub.
> [ERROR] error while loading NettyServer, class file
> '/home/ec2-user/.m2/repository/org/apache/avro/avro-ipc/1.7.6/avro-ipc-1.7.6.jar(org/apache/avro/ipc/NettyServer.class)'
> is broken
> (class java.lang.NullPointerException/null)
> [WARNING] one warning found
> [ERROR] one error found
>
> It seems like what is happening is that the Netty library is missing at
> build time, which happens because it is explicitly excluded in the pom.xml
> (see
> https://github.com/apache/spark/blob/v1.2.1/external/flume-sink/pom.xml#L42).
> I attempted removing the exclusions and the explicit re-add for the test
> scope on lines 77-88, and that allowed the build to succeed, though I don't
> know if that will cause problems at runtime.  I don't have any experience
> with the Flume Sink, so I don't really know how to test it.  (And, to be
> clear, I'm not necessarily trying to get the Flume Sink to work-- I just
> want the project to build successfully, though of course I'd still want the
> Flume Sink to work for whomever does need it.)
>
> Does anybody have any idea what's going on here?  Here is the command BigTop
> is running to build Spark:
>
> mvn -Pbigtop-dist -Pyarn -Phive -Phive-thriftserver -Pkinesis-asl
> -Divy.home=/home/ec2-user/.ivy2 -Dsbt.ivy.home=/home/ec2-user/.ivy2
> -Duser.home=/home/ec2-user -Drepo.maven.org=
> -Dreactor.repo=file:///home/ec2-user/.m2/repository
> -Dhadoop.version=2.4.0-amzn-3-SNAPSHOT -Dyarn.version=2.4.0-amzn-3-SNAPSHOT
> -Dprotobuf.version=2.5.0 -Dscala.version=2.10.3 -Dscala.binary.version=2.10
> -DskipTests -DrecompileMode=all install
>
> As I mentioned above, if I switch to the latest in branch-1.2, to
> v1.3.0-rc2, or to the latest in branch-1.3, I get the same exact error.  I
> was not getting the error with Spark v1.1.0, though there weren't any
> changes to the external/flume-sink/pom.xml between v1.1.0 and v1.2.1.
>
>
> ~ Jonathan Kelly

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark v1.2.1 failing under BigTop build in External Flume Sink (due to missing Netty library)

2015-03-05 Thread Kelly, Jonathan
That's probably a good thing to have, so I'll add it, but unfortunately it
did not help this issue.  It looks like the hadoop-2.4 profile only sets
these properties, which don't seem like they would affect anything related
to Netty:

  
2.4.0
2.5.0
0.9.0
3.1.1
hadoop2
  


Thanks,
Jonathan Kelly




On 3/5/15, 1:09 PM, "Patrick Wendell"  wrote:

>You may need to add the -Phadoop-2.4 profile. When building or release
>packages for Hadoop 2.4 we use the following flags:
>
>-Phadoop-2.4 -Phive -Phive-thriftserver -Pyarn
>
>- Patrick
>
>On Thu, Mar 5, 2015 at 12:47 PM, Kelly, Jonathan 
>wrote:
>> I confirmed that this has nothing to do with BigTop by running the same
>>mvn
>> command directly in a fresh clone of the Spark package at the v1.2.1
>>tag.  I
>> got the same exact error.
>>
>>
>> Jonathan Kelly
>>
>> Elastic MapReduce - SDE
>>
>> Port 99 (SEA35) 08.220.C2
>>
>>
>> From: , Jonathan Kelly 
>> Date: Thursday, March 5, 2015 at 10:39 AM
>> To: "user@spark.apache.org" 
>> Subject: Spark v1.2.1 failing under BigTop build in External Flume Sink
>>(due
>> to missing Netty library)
>>
>> I'm running into an issue building Spark v1.2.1 (as well as the latest
>>in
>> branch-1.2 and v1.3.0-rc2 and the latest in branch-1.3) with BigTop
>>(v0.9,
>> which is not quite released yet).  The build fails in the External Flume
>> Sink subproject with the following error:
>>
>> [INFO] Compiling 5 Scala sources and 3 Java sources to
>> 
>>/workspace/workspace/bigtop.spark-rpm/build/spark/rpm/BUILD/spark-1.3.0/e
>>xternal/flume-sink/target/scala-2.10/classes...
>> [WARNING] Class org.jboss.netty.channel.ChannelFactory not found -
>> continuing with a stub.
>> [ERROR] error while loading NettyServer, class file
>> 
>>'/home/ec2-user/.m2/repository/org/apache/avro/avro-ipc/1.7.6/avro-ipc-1.
>>7.6.jar(org/apache/avro/ipc/NettyServer.class)'
>> is broken
>> (class java.lang.NullPointerException/null)
>> [WARNING] one warning found
>> [ERROR] one error found
>>
>> It seems like what is happening is that the Netty library is missing at
>> build time, which happens because it is explicitly excluded in the
>>pom.xml
>> (see
>> 
>>https://github.com/apache/spark/blob/v1.2.1/external/flume-sink/pom.xml#L
>>42).
>> I attempted removing the exclusions and the explicit re-add for the test
>> scope on lines 77-88, and that allowed the build to succeed, though I
>>don't
>> know if that will cause problems at runtime.  I don't have any
>>experience
>> with the Flume Sink, so I don't really know how to test it.  (And, to be
>> clear, I'm not necessarily trying to get the Flume Sink to work-- I just
>> want the project to build successfully, though of course I'd still want
>>the
>> Flume Sink to work for whomever does need it.)
>>
>> Does anybody have any idea what's going on here?  Here is the command
>>BigTop
>> is running to build Spark:
>>
>> mvn -Pbigtop-dist -Pyarn -Phive -Phive-thriftserver -Pkinesis-asl
>> -Divy.home=/home/ec2-user/.ivy2 -Dsbt.ivy.home=/home/ec2-user/.ivy2
>> -Duser.home=/home/ec2-user -Drepo.maven.org=
>> -Dreactor.repo=file:///home/ec2-user/.m2/repository
>> -Dhadoop.version=2.4.0-amzn-3-SNAPSHOT
>>-Dyarn.version=2.4.0-amzn-3-SNAPSHOT
>> -Dprotobuf.version=2.5.0 -Dscala.version=2.10.3
>>-Dscala.binary.version=2.10
>> -DskipTests -DrecompileMode=all install
>>
>> As I mentioned above, if I switch to the latest in branch-1.2, to
>> v1.3.0-rc2, or to the latest in branch-1.3, I get the same exact error.
>> I
>> was not getting the error with Spark v1.1.0, though there weren't any
>> changes to the external/flume-sink/pom.xml between v1.1.0 and v1.2.1.
>>
>>
>> ~ Jonathan Kelly


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: LBGFS optimizer performace

2015-03-05 Thread DB Tsai
PS, I will recommend you compress the data when you cache the RDD.
There will be some overhead in compression/decompression, and
serialization/deserialization, but it will help a lot for iterative
algorithms with ability to caching more data.

Sincerely,

DB Tsai
---
Blog: https://www.dbtsai.com


On Tue, Mar 3, 2015 at 2:27 PM, Gustavo Enrique Salazar Torres
 wrote:
> Yeah, I can call count before that and it works. Also I was over caching
> tables but I removed those. Now there is no caching but it gets really slow
> since it calculates my table RDD many times.
> Also hacked the LBFGS code to pass the number of examples which I calculated
> outside in a Spark SQL query but just moved the location of the problem.
>
> The query I'm running looks like this:
>
> s"SELECT $mappedFields, tableB.id as b_id FROM tableA LEFT JOIN tableB  ON
> tableA.id=tableB.table_a_id WHERE tableA.status='' OR tableB.status='' "
>
> mappedFields contains a list of fields which I'm interested in. The result
> of that query goes through (including sampling) some transformations before
> being input to LBFGS.
>
> My dataset has 180GB just for feature selection, I'm planning to use 450GB
> to train the final model and I'm using 16 c3.2xlarge EC2 instances, that
> means I have 240GB of RAM available.
>
> Any suggestion? I'm starting to check the algorithm because I don't
> understand why it needs to count the dataset.
>
> Thanks
>
> Gustavo
>
> On Tue, Mar 3, 2015 at 6:08 PM, Joseph Bradley 
> wrote:
>>
>> Is that error actually occurring in LBFGS?  It looks like it might be
>> happening before the data even gets to LBFGS.  (Perhaps the outer join
>> you're trying to do is making the dataset size explode a bit.)  Are you able
>> to call count() (or any RDD action) on the data before you pass it to LBFGS?
>>
>> On Tue, Mar 3, 2015 at 8:55 AM, Gustavo Enrique Salazar Torres
>>  wrote:
>>>
>>> Just did with the same error.
>>> I think the problem is the "data.count()" call in LBFGS because for huge
>>> datasets that's naive to do.
>>> I was thinking to write my version of LBFGS but instead of doing
>>> data.count() I will pass that parameter which I will calculate from a Spark
>>> SQL query.
>>>
>>> I will let you know.
>>>
>>> Thanks
>>>
>>>
>>> On Tue, Mar 3, 2015 at 3:25 AM, Akhil Das 
>>> wrote:

 Can you try increasing your driver memory, reducing the executors and
 increasing the executor memory?

 Thanks
 Best Regards

 On Tue, Mar 3, 2015 at 10:09 AM, Gustavo Enrique Salazar Torres
  wrote:
>
> Hi there:
>
> I'm using LBFGS optimizer to train a logistic regression model. The
> code I implemented follows the pattern showed in
> https://spark.apache.org/docs/1.2.0/mllib-linear-methods.html but training
> data is obtained from a Spark SQL RDD.
> The problem I'm having is that LBFGS tries to count the elements in my
> RDD and that results in a OOM exception since my dataset is huge.
> I'm running on a AWS EMR cluster with 16 c3.2xlarge instances on Hadoop
> YARN. My dataset is about 150 GB but I sample (I take only 1% of the data)
> it in order to scale logistic regression.
> The exception I'm getting is this:
>
> 15/03/03 04:21:44 WARN scheduler.TaskSetManager: Lost task 108.0 in
> stage 2.0 (TID 7600, ip-10-155-20-71.ec2.internal):
> java.lang.OutOfMemoryError: Java heap space
> at java.util.Arrays.copyOfRange(Arrays.java:2694)
> at java.lang.String.(String.java:203)
> at
> com.esotericsoftware.kryo.io.Input.readString(Input.java:448)
> at
> com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:157)
> at
> com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:146)
> at
> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
> at
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
> at
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:651)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
> at
> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
> at
> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42)
> at
> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
> at
> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
> at
> org.a

RE: spark standalone with multiple executors in one work node

2015-03-05 Thread Judy Nash
I meant from one app, yes.

Was asking this because our previous tuning experiment shows spark-on-yarn runs 
faster when overloading workers with executors (i.e. if a worker has 4 cores, 
creating 2 executors each use 4 cores will see a speed boost from 1 executor 
with 4 cores).

I have found an equivalent solution for standalone that have given me a speed 
boost. Instead of adding more executors, I overloaded SPARK_WORKER_CORES to 2x 
of CPU cores on the worker. We are seeing better performance due to CPU now has 
consistent 100% utilization.

-Original Message-
From: Sean Owen [mailto:so...@cloudera.com] 
Sent: Thursday, February 26, 2015 2:11 AM
To: Judy Nash
Cc: user@spark.apache.org
Subject: Re: spark standalone with multiple executors in one work node

--num-executors is the total number of executors. In YARN there is not quite 
the same notion of a Spark worker. Of course, one worker has an executor for 
each running app, so yes, but you mean for one app? it's possible, though not 
usual, to run multiple executors for one app on one worker. This may be useful 
if your executor heap size is otherwise getting huge.

On Thu, Feb 26, 2015 at 1:58 AM, Judy Nash  
wrote:
> Hello,
>
>
>
> Does spark standalone support running multiple executors in one worker node?
>
>
>
> It seems yarn has the parameter --num-executors  to set number of 
> executors to deploy, but I do not find the equivalent parameter in spark 
> standalone.
>
>
>
>
>
> Thanks,
>
> Judy


Question about the spark assembly deployed to the cluster with the ec2 scripts

2015-03-05 Thread Darin McBeath
I've downloaded spark  1.2.0 to my laptop.  In the lib directory, it includes 
spark-assembly-1.2.0-hadoop2.4.0.jar

When I spin up a cluster using the ec2 scripts with 1.2.0 (and set 
--hadoop-major-version=2) I notice that in the lib directory for the 
master/slaves the assembly is for hadoop2.0.0 (and I think Cloudera).

Is there a way that I  can force the install of the same assembly to the 
cluster that comes with the 1.2 download of spark?

Thanks.

Darin.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Identify the performance bottleneck from hardware prospective

2015-03-05 Thread Julaiti Alafate
Hi Mitch,

I think it is normal. The network utilization will be high when there is
some shuffling process happening. After that, the network utilization
should come down, while each slave nodes will do the computation on the
partitions assigned to them. At least it is my understanding.

Best,
Julaiti


On Tue, Mar 3, 2015 at 2:32 AM, Mitch Gusat  wrote:

> Hi Julaiti,
>
> Have you made progress in discovering the bottleneck below?
>
> While i suspect a configuration setting or program bug, i'm intrigued by 
> "network
> utilization is high for several seconds at the beginning, then drop close
> to 0"... Do you know more?
>
> thanks,
> Mitch Gusat (IBM research)
>
> On Tue, Feb 17, 2015 at 11:20 AM, Julaiti Alafate 
> wrote:
>
> > Hi there,
> >
> > I am trying to scale up the data size that my application is handling.
> > This application is running on a cluster with 16 slave nodes. Each slave
> > node has 60GB memory. It is running in standalone mode. The data is coming
> > from HDFS that also in same local network.
> >
> > In order to have an understanding on how my program is running, I also had
> > a Ganglia installed on the cluster. From previous run, I know the stage
> > that taking longest time to run is counting word pairs (my RDD consists of
> > sentences from a corpus). My goal is to identify the bottleneck of my
> > application, then modify my program or hardware configurations according to
> > that.
> >
> > Unfortunately, I didn't find too much information on Spark monitoring and
> > optimization topics. Reynold Xin gave a great talk on Spark Summit 2014 for
> > application tuning from tasks perspective. Basically, his focus is on tasks
> > that oddly slower than the average. However, it didn't solve my problem
> > because there is no such tasks that run way slow than others in my case.
> >
> > So I tried to identify the bottleneck from hardware prospective. I want to
> > know what the limitation of the cluster is. I think if the executers are
> > running hard, either CPU, memory or network bandwidth (or maybe the
> > combinations) is hitting the roof. But Ganglia reports the CPU utilization
> > of cluster is no more than 50%, network utilization is high for several
> > seconds at the beginning, then drop close to 0. From Spark UI, I can see
> > the nodes with maximum memory usage is consuming around 6GB, while
> > "spark.executor.memory" is set to be 20GB.
> >
> > I am very confused that the program is not running fast enough, while
> > hardware resources are not in shortage. Could you please give me some hints
> > about what decides the performance of a Spark application from hardware
> > perspective?
> >
> > Thanks!
> >
> > Julaiti
>
>


Re: Identify the performance bottleneck from hardware prospective

2015-03-05 Thread jalafate
Hi David,

It is a great point. It is actually one of the reasons that my program is
slow. I found that the major cause of my program running slow is the huge
garbage collection time. I created too many small objects in the map
procedure which triggers GC mechanism frequently. After I improved my
program by creating fewer objects, the performance is much better.

Here are two videos that may help other people who also struggling about
finding the bottleneck of your spark applications.

1. A Deeper Understanding of Spark Internals - Aaron Davidson (Databricks)
http://youtu.be/dmL0N3qfSc8

2. Spark Summit 2014 - Advanced Spark Training - Advanced Spark Internals
and Tuning
http://youtu.be/HG2Yd-3r4-M

I personally learned a lot from the points mentioned in the two videos
above.

In practice, I will monitor CPU user time, CPU idle time (if disk IO is the
bottleneck, CPU idle time should be significant), memory usage, network IO
and garbage collection time per task (can be found on the Spark web UI).
Ganglia will be helpful to monitor CPU, memory and network IO.

Best,
Julaiti



On Thu, Mar 5, 2015 at 1:39 AM, davidkl [via Apache Spark User List] <
ml-node+s1001560n21927...@n3.nabble.com> wrote:

> Hello Julaiti,
>
> Maybe I am just asking the obvious :-) but did you check disk IO?
> Depending on what you are doing that could be the bottleneck.
>
> In my case none of the HW resources was a bottleneck, but using some
> distributed features that were blocking execution (e.g. Hazelcast). Could
> that be your case as well?
>
> Regards
>
> --
>  If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Identify-the-performance-bottleneck-from-hardware-prospective-tp21684p21927.html
>  To unsubscribe from Identify the performance bottleneck from hardware
> prospective, click here
> 
> .
> NAML
> 
>




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Identify-the-performance-bottleneck-from-hardware-prospective-tp21684p21937.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Spark Streaming - Duration 1s not matching reality

2015-03-05 Thread eleroy
Hello,

Getting started with Spark. 
Got JavaNetworkWordcount working on a 3 node cluster. netcat on  with a
infinite loop printing random numbers 0-100

With a duration of 1sec, I do see a list of (word, count) values every
second. The list is limited to 10 values (as per the docs)

The count is ~6000 counts per number. I assume that since my input is random
numbers from 0 to 100, and i count 6000 for each, the distribution being
homogeneous, that would mean 600,000 values are being ingested.
I switch to using a constant number, and then I'm seeing between 200,000 and
2,000,000 counts, but the console response is erratic: it's not 1sec
anymore, it's sometimes 2, sometimes more, and sometimes much faster... 

I am looking to do 1-to-1 processing (one value outputs one result) so I
replaced the flatMap function with a map function, and do my calculation. 

Now I'd like to know how many events I was able to process but it's not
clear at all:
If I use print, it's fast again (1sec) but I only see the first 10 results. 
I was trying to add a counter... and realize the counter only seem to
increment by only 11 each time

This is very confusing... It looks like the counter is only incremented on
the elements affected by the print statement... so does that mean the other
values are not even calculated until requested?

If i use .count() on the output RDD, then I do see a realistic count, but
then it doesn't take 1sec anymore: it's more 4 to 5sec to get 600,000 -
1,000,000 events counted.

I'm not sure where to get from here or how to benchmark the time to actually
process the events

Any hint or useful link would be appreciated.
Thanks for your help.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Duration-1s-not-matching-reality-tp21938.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to parse Json formatted Kafka message in spark streaming

2015-03-05 Thread Cui Lin
Hi, Helena,

I think your new version only fits to the json that has very limited columns. I 
couldn’t find MonthlyCommits, but I assume it only has small number of columns 
that are defined manually.
In my case, I have hundreds of column names so it is not feasible to define any 
class for these columns.

Is there any way to get column name instead of hard code “time” in this case?
 mapper.readValue[Map[String,Any]](x).get("time")

Best regards,

Cui Lin

From: Helena Edelson 
mailto:helena.edel...@datastax.com>>
Date: Thursday, March 5, 2015 at 7:02 AM
To: Ted Yu mailto:yuzhih...@gmail.com>>
Cc: Akhil Das mailto:ak...@sigmoidanalytics.com>>, 
Cui Lin mailto:cui@hds.com>>, 
"user@spark.apache.org" 
mailto:user@spark.apache.org>>
Subject: Re: How to parse Json formatted Kafka message in spark streaming

Great point :) Cui, Here’s a cleaner way than I had before, w/out the use of 
spark sql for the mapping:


KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
  ssc, kafka.kafkaParams, Map("github" -> 5), StorageLevel.MEMORY_ONLY)
  .map{ case (k,v) => JsonParser.parse(v).extract[MonthlyCommits]}
  .saveToCassandra("githubstats","monthly_commits")

[datastax_logo.png]
HELENA EDELSON
Senior Software Engineer,  DSE Analytics

[linkedin.png][twitter.png][https://lh3.googleusercontent.com/osrzRgrOxm-gW72LtTXbYGuQkFiBqViXEQBVw4v_cbl99iphx_LETFoz0Ew_bYfYSqIg53gwho5elasykBtuKj1we5KqatfDbvYYw3vnupBmLrs0kkL0t4l9u8JDQqzwLw]

On Mar 5, 2015, at 9:33 AM, Ted Yu 
mailto:yuzhih...@gmail.com>> wrote:

Cui:
You can check messages.partitions.size to determine whether messages is an 
empty RDD.

Cheers

On Thu, Mar 5, 2015 at 12:52 AM, Akhil Das 
mailto:ak...@sigmoidanalytics.com>> wrote:
When you use KafkaUtils.createStream with StringDecoders, it will return String 
objects inside your messages stream. To access the elements from the json, you 
could do something like the following:


   val mapStream = messages.map(x=> {
  val mapper = new ObjectMapper() with ScalaObjectMapper
  mapper.registerModule(DefaultScalaModule)

  mapper.readValue[Map[String,Any]](x).get("time")
})



Thanks
Best Regards

On Thu, Mar 5, 2015 at 11:13 AM, Cui Lin 
mailto:cui@hds.com>> wrote:
Friends,

I'm trying to parse json formatted Kafka messages and then send back to 
cassandra.I have two problems:

  1.  I got the exception below. How to check an empty RDD?

Exception in thread "main" java.lang.UnsupportedOperationException: empty 
collection
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:869)
at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57)
at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232)
at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204)


val messages = KafkaUtils.createStream[String, String, StringDecoder, 
StringDecoder](…)

messages.foreachRDD { rdd =>
  val message:RDD[String] = rdd.map { y => y._2 }
  sqlContext.jsonRDD(message).registerTempTable("tempTable")
  sqlContext.sql("SELECT time,To FROM tempTable")
.saveToCassandra(cassandra_keyspace, cassandra_table, SomeColumns("key", 
"msg"))
}

2. how to get all column names from json messages? I have hundreds of columns 
in the json formatted message.

Thanks for your help!




Best regards,

Cui Lin





Re: How to parse Json formatted Kafka message in spark streaming

2015-03-05 Thread Cui Lin
Hi, Ted,

Thanks for your reply. I noticed from the below link partitions.size will not 
work for checking empty RDD in streams. It seems that the problem can be solved 
in spark 1.3 which is no way to download at this time?

https://issues.apache.org/jira/browse/SPARK-5270
Best regards,

Cui Lin

From: Ted Yu mailto:yuzhih...@gmail.com>>
Date: Thursday, March 5, 2015 at 6:33 AM
To: Akhil Das mailto:ak...@sigmoidanalytics.com>>
Cc: Cui Lin mailto:cui@hds.com>>, 
"user@spark.apache.org" 
mailto:user@spark.apache.org>>
Subject: Re: How to parse Json formatted Kafka message in spark streaming

Cui:
You can check messages.partitions.size to determine whether messages is an 
empty RDD.

Cheers

On Thu, Mar 5, 2015 at 12:52 AM, Akhil Das 
mailto:ak...@sigmoidanalytics.com>> wrote:
When you use KafkaUtils.createStream with StringDecoders, it will return String 
objects inside your messages stream. To access the elements from the json, you 
could do something like the following:


   val mapStream = messages.map(x=> {
  val mapper = new ObjectMapper() with ScalaObjectMapper
  mapper.registerModule(DefaultScalaModule)

  mapper.readValue[Map[String,Any]](x).get("time")
})



Thanks
Best Regards

On Thu, Mar 5, 2015 at 11:13 AM, Cui Lin 
mailto:cui@hds.com>> wrote:
Friends,

I'm trying to parse json formatted Kafka messages and then send back to 
cassandra.I have two problems:

  1.  I got the exception below. How to check an empty RDD?

Exception in thread "main" java.lang.UnsupportedOperationException: empty 
collection
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:869)
at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57)
at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232)
at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204)


val messages = KafkaUtils.createStream[String, String, StringDecoder, 
StringDecoder](…)

messages.foreachRDD { rdd =>
  val message:RDD[String] = rdd.map { y => y._2 }
  sqlContext.jsonRDD(message).registerTempTable("tempTable")
  sqlContext.sql("SELECT time,To FROM tempTable")
.saveToCassandra(cassandra_keyspace, cassandra_table, SomeColumns("key", 
"msg"))
}

2. how to get all column names from json messages? I have hundreds of columns 
in the json formatted message.

Thanks for your help!




Best regards,

Cui Lin




Writing to S3 and retrieving folder names

2015-03-05 Thread Mike Trienis
Hi All,

I am receiving data from AWS Kinesis using Spark Streaming and am writing
the data collected in the dstream to s3 using output function:

dstreamData.saveAsTextFiles("s3n://XXX:XXX@/")

After the run the application for several seconds, I end up with a sequence
of directories in S3 that look like [PREFIX]-1425597204000.

At the same time I'd like to run a copy command on Redshift that pulls over
the exported data. The problem is that I am not sure how to extract the
folder names from the dstream object in order to construct the appropriate
COPY command.

https://spark.apache.org/docs/1.2.0/api/scala/index.html#org.apache.spark.streaming.dstream.DStream

Anyone have any ideas?

Thanks, Mike.


Re: Spark Streaming - Duration 1s not matching reality

2015-03-05 Thread Tathagata Das
Hint: Print() just gives a sample of what is in the data, and does not
enforce the processing on all the data (only the first partition of the rdd
is computed to get 10 items). Count() actually processes all the data. This
is all due to lazy eval, if you don't need to use all the data, don't
compute all the data :)

HTH

TD

On Thu, Mar 5, 2015 at 3:10 PM, eleroy  wrote:

> Hello,
>
> Getting started with Spark.
> Got JavaNetworkWordcount working on a 3 node cluster. netcat on  with a
> infinite loop printing random numbers 0-100
>
> With a duration of 1sec, I do see a list of (word, count) values every
> second. The list is limited to 10 values (as per the docs)
>
> The count is ~6000 counts per number. I assume that since my input is
> random
> numbers from 0 to 100, and i count 6000 for each, the distribution being
> homogeneous, that would mean 600,000 values are being ingested.
> I switch to using a constant number, and then I'm seeing between 200,000
> and
> 2,000,000 counts, but the console response is erratic: it's not 1sec
> anymore, it's sometimes 2, sometimes more, and sometimes much faster...
>
> I am looking to do 1-to-1 processing (one value outputs one result) so I
> replaced the flatMap function with a map function, and do my calculation.
>
> Now I'd like to know how many events I was able to process but it's not
> clear at all:
> If I use print, it's fast again (1sec) but I only see the first 10 results.
> I was trying to add a counter... and realize the counter only seem to
> increment by only 11 each time
>
> This is very confusing... It looks like the counter is only incremented on
> the elements affected by the print statement... so does that mean the other
> values are not even calculated until requested?
>
> If i use .count() on the output RDD, then I do see a realistic count, but
> then it doesn't take 1sec anymore: it's more 4 to 5sec to get 600,000 -
> 1,000,000 events counted.
>
> I'm not sure where to get from here or how to benchmark the time to
> actually
> process the events
>
> Any hint or useful link would be appreciated.
> Thanks for your help.
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Duration-1s-not-matching-reality-tp21938.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Driver disassociated

2015-03-05 Thread Thomas Gerber
Thanks.
I was already setting those (and I checked they were in use through the
environment tab in the UI).

They were set at 10 times their default value: 6 and 1 respectively.

I'll start poking at spark.shuffle.io.retryWait.
Thanks!

On Wed, Mar 4, 2015 at 7:02 PM, Ted Yu  wrote:

> See this thread:
> https://groups.google.com/forum/#!topic/akka-user/X3xzpTCbEFs
>
> Here're the relevant config parameters in Spark:
> val akkaHeartBeatPauses = conf.getInt("spark.akka.heartbeat.pauses",
> 6000)
> val akkaHeartBeatInterval =
> conf.getInt("spark.akka.heartbeat.interval", 1000)
>
> Cheers
>
> On Wed, Mar 4, 2015 at 4:09 PM, Thomas Gerber 
> wrote:
>
>> Also,
>>
>> I was experiencing another problem which might be related:
>> "Error communicating with MapOutputTracker" (see email in the ML today).
>>
>> I just thought I would mention it in case it is relevant.
>>
>> On Wed, Mar 4, 2015 at 4:07 PM, Thomas Gerber 
>> wrote:
>>
>>> 1.2.1
>>>
>>> Also, I was using the following parameters, which are 10 times the
>>> default ones:
>>> spark.akka.timeout 1000
>>> spark.akka.heartbeat.pauses 6
>>> spark.akka.failure-detector.threshold 3000.0
>>> spark.akka.heartbeat.interval 1
>>>
>>> which should have helped *avoid* the problem if I understand correctly.
>>>
>>> Thanks,
>>> Thomas
>>>
>>> On Wed, Mar 4, 2015 at 3:21 PM, Ted Yu  wrote:
>>>
 What release are you using ?

 SPARK-3923 went into 1.2.0 release.

 Cheers

 On Wed, Mar 4, 2015 at 1:39 PM, Thomas Gerber >>> > wrote:

> Hello,
>
> sometimes, in the *middle* of a job, the job stops (status is then
> seen as FINISHED in the master).
>
> There isn't anything wrong in the shell/submit output.
>
> When looking at the executor logs, I see logs like this:
>
> 15/03/04 21:24:51 INFO MapOutputTrackerWorker: Doing the fetch;
> tracker actor = Actor[akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal
> :40019/user/MapOutputTracker#893807065]
> 15/03/04 21:24:51 INFO MapOutputTrackerWorker: Don't have map outputs
> for shuffle 38, fetching them
> 15/03/04 21:24:55 ERROR CoarseGrainedExecutorBackend: Driver
> Disassociated [akka.tcp://sparkExecutor@ip-10-0-11-9.ec2.internal:54766]
> -> [akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:40019]
> disassociated! Shutting down.
> 15/03/04 21:24:55 WARN ReliableDeliverySupervisor: Association with
> remote system [akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:40019]
> has failed, address is now gated for [5000] ms. Reason is: 
> [Disassociated].
>
> How can I investigate further?
> Thanks
>


>>>
>>
>


Re: Spark Build with Hadoop 2.6, yarn - encounter java.lang.NoClassDefFoundError: org/codehaus/jackson/map/deser/std/StdDeserializer

2015-03-05 Thread Zhan Zhang
In addition, you may need following patch if it is not in 1.2.1 to solve some 
system property issue if you use HDP 2.2.

https://github.com/apache/spark/pull/3409

You can follow the following link to set hdp.version for java options.

http://hortonworks.com/hadoop-tutorial/using-apache-spark-hdp/

Thanks.

Zhan Zhang

On Mar 5, 2015, at 11:09 AM, Marcelo Vanzin 
mailto:van...@cloudera.com>> wrote:

It seems from the excerpt below that your cluster is set up to use the
Yarn ATS, and the code is failing in that path. I think you'll need to
apply the following patch to your Spark sources if you want this to
work:

https://github.com/apache/spark/pull/3938

On Thu, Mar 5, 2015 at 10:04 AM, Todd Nist  wrote:
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.serviceInit(YarnClientImpl.java:166)
   at
org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
   at
org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:65)
   at
org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:57)
   at
org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:140)
   at org.apache.spark.SparkContext.(SparkContext.scala:348)

--
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




Re: Writing to S3 and retrieving folder names

2015-03-05 Thread Mike Trienis
Please ignore my question, you can simply specify the root directory and it
looks like redshift takes care of the rest.

copy mobile
from 's3://BUCKET_NAME/'
credentials 
json 's3://BUCKET_NAME/jsonpaths.json'

On Thu, Mar 5, 2015 at 3:33 PM, Mike Trienis 
wrote:

> Hi All,
>
> I am receiving data from AWS Kinesis using Spark Streaming and am writing
> the data collected in the dstream to s3 using output function:
>
> dstreamData.saveAsTextFiles("s3n://XXX:XXX@/")
>
> After the run the application for several seconds, I end up with a
> sequence of directories in S3 that look like [PREFIX]-1425597204000.
>
> At the same time I'd like to run a copy command on Redshift that pulls
> over the exported data. The problem is that I am not sure how to extract
> the folder names from the dstream object in order to construct the
> appropriate COPY command.
>
>
> https://spark.apache.org/docs/1.2.0/api/scala/index.html#org.apache.spark.streaming.dstream.DStream
>
> Anyone have any ideas?
>
> Thanks, Mike.
>


Building Spark 1.3 for Scala 2.11 using Maven

2015-03-05 Thread Night Wolf
Hey guys,

Trying to build Spark 1.3 for Scala 2.11.

I'm running with the folllowng Maven command;

-DskipTests -Dscala-2.11 clean install package


*Exception*:

[ERROR] Failed to execute goal on project spark-core_2.10: Could not
resolve dependencies for project
org.apache.spark:spark-core_2.10:jar:1.3.0-SNAPSHOT: The following
artifacts could not be resolved:
org.apache.spark:spark-network-common_2.11:jar:1.3.0-SNAPSHOT,
org.apache.spark:spark-network-shuffle_2.11:jar:1.3.0-SNAPSHOT:
Failure to find
org.apache.spark:spark-network-common_2.11:jar:1.3.0-SNAPSHOT in
http://repository.apache.org/snapshots was cached in the local
repository, resolution will not be reattempted until the update
interval of apache.snapshots has elapsed or updates are forced ->
[Help 1]


I see these warnings in the log before this error:


[INFO]
[INFO] 
[INFO] Building Spark Project Core 1.3.0-SNAPSHOT
[INFO] 
[WARNING]
The POM for org.apache.spark:spark-network-common_2.11:jar:1.3.0-SNAPSHOT
is missing, no dependency information available[WARNING] The POM for
org.apache.spark:spark-network-shuffle_2.11:jar:1.3.0-SNAPSHOT is
missing, no dependency information available


Any ideas?


Re: Building Spark 1.3 for Scala 2.11 using Maven

2015-03-05 Thread Marcelo Vanzin
I've never tried it, but I'm pretty sure in the very least you want
"-Pscala-2.11" (not -D).

On Thu, Mar 5, 2015 at 4:46 PM, Night Wolf  wrote:
> Hey guys,
>
> Trying to build Spark 1.3 for Scala 2.11.
>
> I'm running with the folllowng Maven command;
>
> -DskipTests -Dscala-2.11 clean install package
>
>
> Exception:
>
> [ERROR] Failed to execute goal on project spark-core_2.10: Could not resolve
> dependencies for project
> org.apache.spark:spark-core_2.10:jar:1.3.0-SNAPSHOT: The following artifacts
> could not be resolved:
> org.apache.spark:spark-network-common_2.11:jar:1.3.0-SNAPSHOT,
> org.apache.spark:spark-network-shuffle_2.11:jar:1.3.0-SNAPSHOT: Failure to
> find org.apache.spark:spark-network-common_2.11:jar:1.3.0-SNAPSHOT in
> http://repository.apache.org/snapshots was cached in the local repository,
> resolution will not be reattempted until the update interval of
> apache.snapshots has elapsed or updates are forced -> [Help 1]
>
>
> I see these warnings in the log before this error:
>
>
> [INFO]
> [INFO]
> 
> [INFO] Building Spark Project Core 1.3.0-SNAPSHOT
> [INFO]
> 
> [WARNING] The POM for
> org.apache.spark:spark-network-common_2.11:jar:1.3.0-SNAPSHOT is missing, no
> dependency information available
> [WARNING] The POM for
> org.apache.spark:spark-network-shuffle_2.11:jar:1.3.0-SNAPSHOT is missing,
> no dependency information available
>
>
> Any ideas?



-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark with data on NFS v HDFS

2015-03-05 Thread Tobias Pfeiffer
Hi,

On Thu, Mar 5, 2015 at 10:58 PM, Ashish Mukherjee <
ashish.mukher...@gmail.com> wrote:
>
> I understand Spark can be used with Hadoop or standalone. I have certain
> questions related to use of the correct FS for Spark data.
>
> What is the efficiency trade-off in feeding data to Spark from NFS v HDFS?
>

As I understand it, one performance advantage of using HDFS is that the
task will be computed at a cluster node that has data on its local disk
already, so the tasks go to where the data is. In the case of NFS, all data
must be downloaded from the file server(s) first, so there is no such thing
as "data locality".

Tobias


Re: Building Spark 1.3 for Scala 2.11 using Maven

2015-03-05 Thread Marcelo Vanzin
Ah, and you may have to use dev/change-version-to-2.11.sh. (Again,
never tried compiling with scala 2.11.)

On Thu, Mar 5, 2015 at 4:52 PM, Marcelo Vanzin  wrote:
> I've never tried it, but I'm pretty sure in the very least you want
> "-Pscala-2.11" (not -D).
>
> On Thu, Mar 5, 2015 at 4:46 PM, Night Wolf  wrote:
>> Hey guys,
>>
>> Trying to build Spark 1.3 for Scala 2.11.
>>
>> I'm running with the folllowng Maven command;
>>
>> -DskipTests -Dscala-2.11 clean install package
>>
>>
>> Exception:
>>
>> [ERROR] Failed to execute goal on project spark-core_2.10: Could not resolve
>> dependencies for project
>> org.apache.spark:spark-core_2.10:jar:1.3.0-SNAPSHOT: The following artifacts
>> could not be resolved:
>> org.apache.spark:spark-network-common_2.11:jar:1.3.0-SNAPSHOT,
>> org.apache.spark:spark-network-shuffle_2.11:jar:1.3.0-SNAPSHOT: Failure to
>> find org.apache.spark:spark-network-common_2.11:jar:1.3.0-SNAPSHOT in
>> http://repository.apache.org/snapshots was cached in the local repository,
>> resolution will not be reattempted until the update interval of
>> apache.snapshots has elapsed or updates are forced -> [Help 1]
>>
>>
>> I see these warnings in the log before this error:
>>
>>
>> [INFO]
>> [INFO]
>> 
>> [INFO] Building Spark Project Core 1.3.0-SNAPSHOT
>> [INFO]
>> 
>> [WARNING] The POM for
>> org.apache.spark:spark-network-common_2.11:jar:1.3.0-SNAPSHOT is missing, no
>> dependency information available
>> [WARNING] The POM for
>> org.apache.spark:spark-network-shuffle_2.11:jar:1.3.0-SNAPSHOT is missing,
>> no dependency information available
>>
>>
>> Any ideas?
>
>
>
> --
> Marcelo



-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to parse Json formatted Kafka message in spark streaming

2015-03-05 Thread Ted Yu
See following thread for 1.3.0 release:
http://search-hadoop.com/m/JW1q5hV8c4

Looks like the release is around the corner.

On Thu, Mar 5, 2015 at 3:26 PM, Cui Lin  wrote:

>   Hi, Ted,
>
>  Thanks for your reply. I noticed from the below link partitions.size
> will not work for checking empty RDD in streams. It seems that the problem
> can be solved in spark 1.3 which is no way to download at this time?
>
>  https://issues.apache.org/jira/browse/SPARK-5270
>  Best regards,
>
>  Cui Lin
>
>   From: Ted Yu 
> Date: Thursday, March 5, 2015 at 6:33 AM
> To: Akhil Das 
> Cc: Cui Lin , "user@spark.apache.org" <
> user@spark.apache.org>
> Subject: Re: How to parse Json formatted Kafka message in spark streaming
>
>   Cui:
> You can check messages.partitions.size to determine whether messages is
> an empty RDD.
>
>  Cheers
>
> On Thu, Mar 5, 2015 at 12:52 AM, Akhil Das 
> wrote:
>
>>  When you use KafkaUtils.createStream with StringDecoders, it will
>> return String objects inside your messages stream. To access the elements
>> from the json, you could do something like the following:
>>
>>
>> val mapStream = messages.map(x=> {
>>val mapper = new ObjectMapper() with ScalaObjectMapper
>>   mapper.registerModule(DefaultScalaModule)
>>
>>mapper.readValue[Map[String,Any]](x)*.get("time")*
>> })
>>
>>
>>
>>  Thanks
>> Best Regards
>>
>> On Thu, Mar 5, 2015 at 11:13 AM, Cui Lin  wrote:
>>
>>>   Friends,
>>>
>>>   I'm trying to parse json formatted Kafka messages and then send back
>>> to cassandra.I have two problems:
>>>
>>>1. I got the exception below. How to check an empty RDD?
>>>
>>>  Exception in thread "main" java.lang.UnsupportedOperationException:
>>> empty collection
>>>  at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
>>>  at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
>>>  at scala.Option.getOrElse(Option.scala:120)
>>>  at org.apache.spark.rdd.RDD.reduce(RDD.scala:869)
>>>  at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57)
>>>  at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232)
>>>  at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204)
>>>
>>>  val messages = KafkaUtils.createStream[String, String, StringDecoder, 
>>> StringDecoder](…)
>>>
>>> messages.foreachRDD { rdd =>
>>>   val message:RDD[String] = rdd.map { y => y._2 }
>>>   sqlContext.jsonRDD(message).registerTempTable("tempTable")
>>>   sqlContext.sql("SELECT time,To FROM tempTable")
>>> .saveToCassandra(cassandra_keyspace, cassandra_table, 
>>> SomeColumns("key", "msg"))
>>> }
>>>
>>>
>>>  2. how to get all column names from json messages? I have hundreds of
>>> columns in the json formatted message.
>>>
>>>  Thanks for your help!
>>>
>>>
>>>
>>>
>>>  Best regards,
>>>
>>>  Cui Lin
>>>
>>
>>
>


Spark code development practice

2015-03-05 Thread Xi Shen
Hi,

I am new to Spark. I see every spark program has a main() function. I
wonder if I can run the spark program directly, without using spark-submit.
I think it will be easier for early development and debug.


Thanks,
David


Re: Spark code development practice

2015-03-05 Thread Stephen Boesch
Hi Xi,
  Yes,
  You can do the following:

val sc = new SparkContext("local[2]", "mptest")

// or .. val sc = new SparkContext("spark://master:7070", "mptest")

val fileDataRdd = sc.textFile("/path/to/dir")

val fileLines = fileDataRdd.take(100)


The key here - i.e. the answer to your specific question - is the creation
of the SparkContext : that is one of the things spark-submit is doing for
you

2015-03-05 17:19 GMT-08:00 Xi Shen :

> Hi,
>
> I am new to Spark. I see every spark program has a main() function. I
> wonder if I can run the spark program directly, without using spark-submit.
> I think it will be easier for early development and debug.
>
>
> Thanks,
> David
>
>


Re: Spark code development practice

2015-03-05 Thread fightf...@163.com
Hi,

You can first establish a scala ide to develop and debug your spark program, 
lets say, intellij idea or eclipse.

Thanks,
Sun.



fightf...@163.com
 
From: Xi Shen
Date: 2015-03-06 09:19
To: user@spark.apache.org
Subject: Spark code development practice
Hi,

I am new to Spark. I see every spark program has a main() function. I wonder if 
I can run the spark program directly, without using spark-submit. I think it 
will be easier for early development and debug.


Thanks,
David



Re: External Data Source in Spark

2015-03-05 Thread Michael Armbrust
>
> Currently we have implemented  External Data Source API and are able to
> push filters and projections.
>
> Could you provide some info on how perhaps the joins could be pushed to
> the original Data Source if both the data sources are from same database
> *.*
>

First a disclaimer: This is an experimental API that exposes internals that
are likely to change in between different Spark releases.  As a result,
most datasources should be written against the stable public API in
org.apache.spark.sql.sources
.
We expose this mostly to get feedback on what optimizations we should add
to the stable API in order to get the best performance out of data sources.

We'll start with a simple artificial data source that just returns ranges
of consecutive integers.

/** A data source that returns ranges of consecutive integers in a
column named `a`. */case class SimpleRelation(
start: Int,
end: Int)(
@transient val sqlContext: SQLContext)
  extends BaseRelation with TableScan {

  val schema = StructType('a.int :: Nil)
  def buildScan() = sqlContext.sparkContext.parallelize(start to
end).map(Row(_))
}


Given this we can create tables:

sqlContext.baseRelationToDataFrame(SimpleRelation(1,
1)(sqlContext)).registerTempTable("smallTable")
sqlContext.baseRelationToDataFrame(SimpleRelation(1,
1000)(sqlContext)).registerTempTable("bigTable")


However, doing a join is pretty slow since we need to shuffle the big table
around for no reason:

sql("SELECT * FROM smallTable s JOIN bigTable b ON s.a = b.a").collect()
res3: Array[org.apache.spark.sql.Row] = Array([1,1])


This takes about 10 seconds on my cluster.  Clearly we can do better.  So
let's define special physical operators for the case when we are inner
joining two of these relations using equality. One will handle the case
when there is no overlap and the other when there is.  Physical operators
must extend SparkPlan and must return an RDD[Row] containing the answer
when execute() is called.

import org.apache.spark.sql.catalyst.expressions.{Attribute,
EqualTo}import org.apache.spark.sql.catalyst.plans._import
org.apache.spark.sql.catalyst.plans.logical._import
org.apache.spark.sql.execution.SparkPlan
/** A join that just returns the pre-calculated overlap of two ranges
of consecutive integers. */case class OverlappingRangeJoin(leftOutput:
Attribute, rightOutput: Attribute, start: Int, end: Int) extends
SparkPlan {
  def output: Seq[Attribute] = leftOutput :: rightOutput :: Nil

  def execute(): org.apache.spark.rdd.RDD[Row] = {
sqlContext.sparkContext.parallelize(start to end).map(i => Row(i, i))
  }

  def children: Seq[SparkPlan] = Nil
}
/** Used when a join is known to produce no results. */case class
EmptyJoin(output: Seq[Attribute]) extends SparkPlan {
  def execute(): org.apache.spark.rdd.RDD[Row] = {
sqlContext.sparkContext.emptyRDD
  }

  def children: Seq[SparkPlan] = Nil
}
/** Finds cases where two sets of consecutive integer ranges are inner
joined on equality. */object SmartSimpleJoin extends Strategy with
Serializable {
  def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
// Find inner joins between two SimpleRelations where the
condition is equality.
case Join(l @ LogicalRelation(left: SimpleRelation), r @
LogicalRelation(right: SimpleRelation), Inner, Some(EqualTo(a, b))) =>
  // Check if the join condition is comparing `a` from each relation.
  if (a == l.output.head && b == r.output.head || a ==
r.output.head && b == l.output.head) {
if ((left.start <= right.end) && (left.end >= right.start)) {
  OverlappingRangeJoin(
l.output.head,
r.output.head,
math.max(left.start, right.start),
math.min(left.end, right.end)) :: Nil
} else {
  // Ranges don't overlap, join will be empty
  EmptyJoin(l.output.head :: r.output.head :: Nil) :: Nil
}
  } else {
// Join isn't between the the columns output...
// Let's just let the query planner handle this.
Nil
  }
case _ => Nil // Return an empty list if we don't know how to
handle this plan.
  }
}


We can then add these strategies to the query planner through the
experimental hook.  Added strategies take precedence over built-in ones.

// Add the strategy to the query planner.
sqlContext.experimental.extraStrategies = SmartSimpleJoin :: Nil


sql("SELECT * FROM smallTable s JOIN bigTable b ON s.a = b.a").collect()
res4: Array[org.apache.spark.sql.Row] = Array([1,1])


Now our join returns in < 1 second.  For more advanced matching of joins
and their conditions you should look at the patterns that are available
,
and the built-in join strategies


Re: External Data Source in Spark

2015-03-05 Thread Michael Armbrust
One other caveat: While writing up this example I realized that we make
SparkPlan private and we are already packaging 1.3-RC3... So you'll need a
custom build of Spark for this to run. We'll fix this in the next release.

On Thu, Mar 5, 2015 at 5:26 PM, Michael Armbrust 
wrote:

> Currently we have implemented  External Data Source API and are able to
>> push filters and projections.
>>
>> Could you provide some info on how perhaps the joins could be pushed to
>> the original Data Source if both the data sources are from same database
>> *.*
>>
>
> First a disclaimer: This is an experimental API that exposes internals
> that are likely to change in between different Spark releases.  As a
> result, most datasources should be written against the stable public API in
> org.apache.spark.sql.sources
> .
> We expose this mostly to get feedback on what optimizations we should add
> to the stable API in order to get the best performance out of data sources.
>
> We'll start with a simple artificial data source that just returns ranges
> of consecutive integers.
>
> /** A data source that returns ranges of consecutive integers in a column 
> named `a`. */case class SimpleRelation(
> start: Int,
> end: Int)(
> @transient val sqlContext: SQLContext)
>   extends BaseRelation with TableScan {
>
>   val schema = StructType('a.int :: Nil)
>   def buildScan() = sqlContext.sparkContext.parallelize(start to 
> end).map(Row(_))
> }
>
>
> Given this we can create tables:
>
> sqlContext.baseRelationToDataFrame(SimpleRelation(1, 
> 1)(sqlContext)).registerTempTable("smallTable")
> sqlContext.baseRelationToDataFrame(SimpleRelation(1, 
> 1000)(sqlContext)).registerTempTable("bigTable")
>
>
> However, doing a join is pretty slow since we need to shuffle the big
> table around for no reason:
>
> sql("SELECT * FROM smallTable s JOIN bigTable b ON s.a = b.a").collect()
> res3: Array[org.apache.spark.sql.Row] = Array([1,1])
>
>
> This takes about 10 seconds on my cluster.  Clearly we can do better.  So
> let's define special physical operators for the case when we are inner
> joining two of these relations using equality. One will handle the case
> when there is no overlap and the other when there is.  Physical operators
> must extend SparkPlan and must return an RDD[Row] containing the answer
> when execute() is called.
>
> import org.apache.spark.sql.catalyst.expressions.{Attribute, EqualTo}import 
> org.apache.spark.sql.catalyst.plans._import 
> org.apache.spark.sql.catalyst.plans.logical._import 
> org.apache.spark.sql.execution.SparkPlan
> /** A join that just returns the pre-calculated overlap of two ranges of 
> consecutive integers. */case class OverlappingRangeJoin(leftOutput: 
> Attribute, rightOutput: Attribute, start: Int, end: Int) extends SparkPlan {
>   def output: Seq[Attribute] = leftOutput :: rightOutput :: Nil
>
>   def execute(): org.apache.spark.rdd.RDD[Row] = {
> sqlContext.sparkContext.parallelize(start to end).map(i => Row(i, i))
>   }
>
>   def children: Seq[SparkPlan] = Nil
> }
> /** Used when a join is known to produce no results. */case class 
> EmptyJoin(output: Seq[Attribute]) extends SparkPlan {
>   def execute(): org.apache.spark.rdd.RDD[Row] = {
> sqlContext.sparkContext.emptyRDD
>   }
>
>   def children: Seq[SparkPlan] = Nil
> }
> /** Finds cases where two sets of consecutive integer ranges are inner joined 
> on equality. */object SmartSimpleJoin extends Strategy with Serializable {
>   def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
> // Find inner joins between two SimpleRelations where the condition is 
> equality.
> case Join(l @ LogicalRelation(left: SimpleRelation), r @ 
> LogicalRelation(right: SimpleRelation), Inner, Some(EqualTo(a, b))) =>
>   // Check if the join condition is comparing `a` from each relation.
>   if (a == l.output.head && b == r.output.head || a == r.output.head && b 
> == l.output.head) {
> if ((left.start <= right.end) && (left.end >= right.start)) {
>   OverlappingRangeJoin(
> l.output.head,
> r.output.head,
> math.max(left.start, right.start),
> math.min(left.end, right.end)) :: Nil
> } else {
>   // Ranges don't overlap, join will be empty
>   EmptyJoin(l.output.head :: r.output.head :: Nil) :: Nil
> }
>   } else {
> // Join isn't between the the columns output...
> // Let's just let the query planner handle this.
> Nil
>   }
> case _ => Nil // Return an empty list if we don't know how to handle this 
> plan.
>   }
> }
>
>
> We can then add these strategies to the query planner through the
> experimental hook.  Added strategies take precedence over built-in ones.
>
> // Add the strategy to the query planner.
> sqlContext.experimental.extraStrategies = SmartSimpleJoin :: Nil
>

Re: Spark code development practice

2015-03-05 Thread Koen Vantomme
use the spark-shell command and the shell will open 
type :paste abd then paste your code, after control-d

open spark-shell:
sparks/bin
./spark-shell

Verstuurd vanaf mijn iPhone

> Op 6-mrt.-2015 om 02:28 heeft "fightf...@163.com"  het 
> volgende geschreven:
> 
> Hi,
> 
> You can first establish a scala ide to develop and debug your spark program, 
> lets say, intellij idea or eclipse.
> 
> Thanks,
> Sun.
> 
> fightf...@163.com
>  
> From: Xi Shen
> Date: 2015-03-06 09:19
> To: user@spark.apache.org
> Subject: Spark code development practice
> Hi,
> 
> I am new to Spark. I see every spark program has a main() function. I wonder 
> if I can run the spark program directly, without using spark-submit. I think 
> it will be easier for early development and debug.
> 
> 
> Thanks,
> David
> 


spark-ec2 script problems

2015-03-05 Thread roni
Hi ,
 I used spark-ec2 script to create ec2 cluster.

 Now I am trying copy data from s3 into hdfs.
I am doing this
*root@ip-172-31-21-160 ephemeral-hdfs]$ bin/hadoop distcp
s3:///home/mydata/small.sam
hdfs://ec2-52-11-148-31.us-west-2.compute.amazonaws.com:9010/data1
*

and I get following error -

2015-03-06 01:39:27,299 INFO  tools.DistCp (DistCp.java:run(109)) - Input
Options: DistCpOptions{atomicCommit=false, syncFolder=false,
deleteMissing=false, ignoreFailures=false, maxMaps=20,
sslConfigurationFile='null', copyStrategy='uniformsize',
sourceFileListing=null, sourcePaths=[s3:///home/mydata/small.sam],
targetPath=hdfs://
ec2-52-11-148-31.us-west-2.compute.amazonaws.com:9010/data1}
2015-03-06 01:39:27,585 INFO  mapreduce.Cluster
(Cluster.java:initialize(114)) - Failed to use
org.apache.hadoop.mapred.LocalClientProtocolProvider due to error: Invalid
"mapreduce.jobtracker.address" configuration value for LocalJobRunner : "
ec2-52-11-148-31.us-west-2.compute.amazonaws.com:9001"
2015-03-06 01:39:27,585 ERROR tools.DistCp (DistCp.java:run(126)) -
Exception encountered
java.io.IOException: Cannot initialize Cluster. Please check your
configuration for mapreduce.framework.name and the correspond server
addresses.
at org.apache.hadoop.mapreduce.Cluster.initialize(Cluster.java:121)
at org.apache.hadoop.mapreduce.Cluster.(Cluster.java:83)
at org.apache.hadoop.mapreduce.Cluster.(Cluster.java:76)
at org.apache.hadoop.tools.DistCp.createMetaFolderPath(DistCp.java:352)
at org.apache.hadoop.tools.DistCp.execute(DistCp.java:146)
at org.apache.hadoop.tools.DistCp.run(DistCp.java:118)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
at org.apache.hadoop.tools.DistCp.main(DistCp.java:374)

I tried doing start-all.sh , start-dfs.sh  and start-yarn.sh

what should I do ?
Thanks
-roni


SparkSQL JSON array support

2015-03-05 Thread Justin Pihony
Is there any plans of supporting JSON arrays more fully? Take for example:

val myJson =
sqlContext.jsonRDD(List("""{"foo":[{"bar":1},{"baz":2}]}"""))
myJson.registerTempTable("JsonTest")

I would like a way to pull out parts of the array data based on a key

sql("""SELECT foo["bar"] FROM JsonTest""") //projects only the object
with bar, the rest would be null
 
I could even work around this if there was some way to access the key name
from the SchemaRDD:

myJson.filter(x=>x(0).asInstanceOf[Seq[Row]].exists(y=>y.key == "bar"))
.map(x=>x(0).asInstanceOf[Seq[Row]].filter(y=>y.key == "bar")) 
//This does the same as above, except also filtering out those without a
bar key

This is the closest suggestion I could find thus far,
 
which still does not solve the problem of pulling out the keys.

I tried with a UDF also, but could not currently make that work either.

If there isn't anything in the works, then would it be appropriate to create
a ticket for this?

Thanks,
Justin



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-JSON-array-support-tp21939.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SparkSQL JSON array support

2015-03-05 Thread Michael Armbrust
You can do want with lateral view explode, but what seems to be missing is
that jsonRDD converts json objects into structs (fixed keys with a fixed
order) and fields in a struct are accessed using a `.`

val myJson = 
sqlContext.jsonRDD(sc.parallelize("""{"foo":[{"bar":1},{"baz":2}]}"""
:: Nil))
myJson.registerTempTable("JsonTest")​val result = sql("SELECT f.bar
FROM JsonTest LATERAL VIEW explode(foo) a AS f").collect()

myJson: org.apache.spark.sql.DataFrame = [foo:
array>]
result: Array[org.apache.spark.sql.Row] = Array([1], [null])


In Spark 1.3 you can also hint to jsonRDD that you'd like the json objects
converted into Maps (non-uniform keys) instead of structs, by manually
specifying the schema of your JSON.

import org.apache.spark.sql.types._val schema =
  StructType(
StructField("foo", ArrayType(MapType(StringType, IntegerType))) :: Nil)
​
sqlContext.jsonRDD(sc.parallelize("""{"foo":[{"bar":1},{"baz":2}]}"""
:: Nil), schema).registerTempTable("jsonTest")
​val withSql = sql("SELECT a FROM jsonTest LATERAL VIEW explode(foo) a
AS a WHERE a['bar'] IS NOT NULL").collect()
​val withSpark = sql("SELECT a FROM jsonTest LATERAL VIEW explode(foo)
a AS a").rdd.filter  {
  case Row(a: Map[String, Int]) if a.contains("bar") => true
  case _: Row => false
}.collect()

schema: org.apache.spark.sql.types.StructType =
StructType(StructField(foo,ArrayType(MapType(StringType,IntegerType,true),true),true))
withSql: Array[org.apache.spark.sql.Row] = Array([Map(bar -> 1)])
withSpark: Array[org.apache.spark.sql.Row] = Array([Map(bar -> 1)])


Michael

On Thu, Mar 5, 2015 at 6:11 PM, Justin Pihony 
wrote:

> Is there any plans of supporting JSON arrays more fully? Take for example:
>
> val myJson =
> sqlContext.jsonRDD(List("""{"foo":[{"bar":1},{"baz":2}]}"""))
> myJson.registerTempTable("JsonTest")
>
> I would like a way to pull out parts of the array data based on a key
>
> sql("""SELECT foo["bar"] FROM JsonTest""") //projects only the object
> with bar, the rest would be null
>
> I could even work around this if there was some way to access the key name
> from the SchemaRDD:
>
> myJson.filter(x=>x(0).asInstanceOf[Seq[Row]].exists(y=>y.key == "bar"))
> .map(x=>x(0).asInstanceOf[Seq[Row]].filter(y=>y.key == "bar"))
> //This does the same as above, except also filtering out those without
> a
> bar key
>
> This is the closest suggestion I could find thus far,
> <
> https://cwiki.apache.org/confluence/display/Hive/LanguageManual+LateralView
> >
> which still does not solve the problem of pulling out the keys.
>
> I tried with a UDF also, but could not currently make that work either.
>
> If there isn't anything in the works, then would it be appropriate to
> create
> a ticket for this?
>
> Thanks,
> Justin
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-JSON-array-support-tp21939.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark code development practice

2015-03-05 Thread Xi Shen
Thanks guys, this is very useful :)

@Stephen, I know spark-shell will create a SC for me. But I don't
understand why we still need to do "new SparkContext(...)" in our code.
Shouldn't we get it from some where? e.g. "SparkContext.get".

Another question, if I want my spark code to run in YARN later, how should
I create the SparkContext? Or I can just specify "--marst yarn" on command
line?


Thanks,
David


On Fri, Mar 6, 2015 at 12:38 PM Koen Vantomme 
wrote:

> use the spark-shell command and the shell will open
> type :paste abd then paste your code, after control-d
>
> open spark-shell:
> sparks/bin
> ./spark-shell
>
> Verstuurd vanaf mijn iPhone
>
> Op 6-mrt.-2015 om 02:28 heeft "fightf...@163.com"  het
> volgende geschreven:
>
> Hi,
>
> You can first establish a scala ide to develop and debug your spark
> program, lets say, intellij idea or eclipse.
>
> Thanks,
> Sun.
>
> --
> fightf...@163.com
>
>
> *From:* Xi Shen 
> *Date:* 2015-03-06 09:19
> *To:* user@spark.apache.org
> *Subject:* Spark code development practice
> Hi,
>
> I am new to Spark. I see every spark program has a main() function. I
> wonder if I can run the spark program directly, without using spark-submit.
> I think it will be easier for early development and debug.
>
>
> Thanks,
> David
>
>


Re: Training Random Forest

2015-03-05 Thread Xiangrui Meng
We don't support warm starts or online updates for decision trees. So
if you call train twice, only the second dataset is used for training.
-Xiangrui

On Thu, Mar 5, 2015 at 12:31 PM, drarse  wrote:
> I am testing the Random Forest in Spark, but I have a question... If I train
> for the second time, will update the decision trees created or these are
> created anew ?. That is, does the system will continually learning for each
> dataset or only the first?
>
> Thanks for everything
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Training-Random-Forest-tp21935.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Construct model matrix from SchemaRDD automatically

2015-03-05 Thread Wush Wu
Dear all,

I am a new spark user from R.

After exploring the schemaRDD, I notice that it is similar to data.frame.
Is there a feature like `model.matrix` in R to convert schemaRDD to model
matrix automatically according to the type without explicitly converting
them one by one?

Thanks,
Wush


Re: Construct model matrix from SchemaRDD automatically

2015-03-05 Thread Evan R. Sparks
Hi Wush,

I'm CC'ing user@spark.apache.org (which is the new list) and BCC'ing
u...@spark.incubator.apache.org.

In Spark 1.3, schemaRDD is in fact being renamed to DataFrame (see:
https://databricks.com/blog/2015/02/17/introducing-dataframes-in-spark-for-large-scale-data-science.html
)

As for a "model.matrix", you might have a look at the new pipelines API in
spark 1.2 (to be further improved in 1.3) which provides facilities for
repeatable data transformation as input to ML algorithms. That said -
something to handle the case of automatically one-hot encoding all the
categorical variables in a DataFrame might be a welcome addition.

- Evan

On Thu, Mar 5, 2015 at 8:43 PM, Wush Wu  wrote:

> Dear all,
>
> I am a new spark user from R.
>
> After exploring the schemaRDD, I notice that it is similar to data.frame.
> Is there a feature like `model.matrix` in R to convert schemaRDD to model
> matrix automatically according to the type without explicitly converting
> them one by one?
>
> Thanks,
> Wush
>
>
>
>
>
>


why my YoungGen GC takes so long time?

2015-03-05 Thread lisendong
I found my task takes so long time for YoungGen GC, I set the young gen size
to about 1.5G, I wonder why it takes so long time?
not all the tasks take such long time, only about 1% tasks so long...


180.426: [GC [PSYoungGen: 9916105K->1676785K(14256640K)]
26201020K->18690057K(53403648K), 17.3581500 secs] [Times: user=104.48
sys=152.56, real=17.36 secs] 
198.986: [GC [PSYoungGen: 10724337K->1664837K(14881280K)]
27737609K->20205003K(54028288K), 55.0331460 secs] [Times: user=47.67
sys=776.67, real=55.02 secs] 
255.339: [GC [PSYoungGen: 11605317K->1550792K(14632960K)]
30145483K->21580959K(53779968K), 187.7893060 secs] [Times: user=110.10
sys=2704.33, real=187.76 secs] 
444.569: [GC [PSYoungGen: 11491272K->988109K(15197696K)]
31521439K->22370228K(54344704K), 366.2677820 secs] [Times: user=78.76
sys=5193.95, real=366.21 secs] 
812.450: [GC [PSYoungGen: 11704781K->1087351K(15092736K)]
33086900K->23295623K(54239744K), 163.0328770 secs] [Times: user=97.71
sys=2134.11, real=163.01 secs] 
977.207: [GC [PSYoungGen: 11804023K->1058228K(15470592K)]
34012295K->24294801K(54617600K), 176.5011980 secs] [Times: user=106.06
sys=1700.11, real=176.47 secs] 
1155.439: [GC [PSYoungGen: 12288948K->1248832K(15333888K)]
35525521K->25495802K(54480896K), 123.5496760 secs] [Times: user=59.09
sys=940.54, real=123.53 secs] 
1280.796: [GC [PSYoungGen: 12479552K->1097796K(15785472K)]
36726522K->26527988K(54932480K), 157.1727550 secs] [Times: user=11.14
sys=1376.93, real=157.15 secs] 
1439.923: [GC [PSYoungGen: 12953668K->1178863K(15644160K)]
38383860K->27637944K(54791168K), 119.3690200 secs] [Times: user=107.54
sys=937.40, real=119.35 secs] 
1561.236: [GC [PSYoungGen: 13034735K->1190849K(16093696K)]
39493816K->28787207K(55240704K), 159.1091770 secs] [Times: user=237.16
sys=1653.59, real=159.08 secs] 
1722.406: [GC [PSYoungGen: 13651905K->1245193K(15941120K)]
41248263K->29995801K(55088128K), 168.0231330 secs] [Times: user=81.20
sys=2203.72, real=168.00 secs] 
1892.731: [GC [PSYoungGen: 13706249K->1161132K(16381952K)]
42456857K->31117445K(55528960K), 162.0598100 secs] [Times: user=126.67
sys=1689.32, real=162.04 secs] 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/why-my-YoungGen-GC-takes-so-long-time-tp21941.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark-ec2 script problems

2015-03-05 Thread Akhil Das
It works pretty fine for me with the script comes with 1.2.0 release.

Here's a few things which you can try:

- Add your s3 credentials to the core-site.xml

  fs.s3.awsAccessKeyId
ID
fs.s3.awsSecretAccessKey
SECRET

- Do a jps and see all services are up and running (Namenode,
SecondaryNamenode, Datanodes etc.)

I think the default hdfs port that comes with spark-ec2 is 9000. You can
check that in your core-site.xml file



Thanks
Best Regards

On Fri, Mar 6, 2015 at 7:14 AM, roni  wrote:

> Hi ,
>  I used spark-ec2 script to create ec2 cluster.
>
>  Now I am trying copy data from s3 into hdfs.
> I am doing this
> *root@ip-172-31-21-160 ephemeral-hdfs]$ bin/hadoop distcp
> s3:///home/mydata/small.sam
> hdfs://ec2-52-11-148-31.us-west-2.compute.amazonaws.com:9010/data1
> *
>
> and I get following error -
>
> 2015-03-06 01:39:27,299 INFO  tools.DistCp (DistCp.java:run(109)) - Input
> Options: DistCpOptions{atomicCommit=false, syncFolder=false,
> deleteMissing=false, ignoreFailures=false, maxMaps=20,
> sslConfigurationFile='null', copyStrategy='uniformsize',
> sourceFileListing=null, sourcePaths=[s3:///home/mydata/small.sam],
> targetPath=hdfs://
> ec2-52-11-148-31.us-west-2.compute.amazonaws.com:9010/data1}
> 2015-03-06 01:39:27,585 INFO  mapreduce.Cluster
> (Cluster.java:initialize(114)) - Failed to use
> org.apache.hadoop.mapred.LocalClientProtocolProvider due to error: Invalid
> "mapreduce.jobtracker.address" configuration value for LocalJobRunner : "
> ec2-52-11-148-31.us-west-2.compute.amazonaws.com:9001"
> 2015-03-06 01:39:27,585 ERROR tools.DistCp (DistCp.java:run(126)) -
> Exception encountered
> java.io.IOException: Cannot initialize Cluster. Please check your
> configuration for mapreduce.framework.name and the correspond server
> addresses.
> at org.apache.hadoop.mapreduce.Cluster.initialize(Cluster.java:121)
> at org.apache.hadoop.mapreduce.Cluster.(Cluster.java:83)
> at org.apache.hadoop.mapreduce.Cluster.(Cluster.java:76)
> at org.apache.hadoop.tools.DistCp.createMetaFolderPath(DistCp.java:352)
> at org.apache.hadoop.tools.DistCp.execute(DistCp.java:146)
> at org.apache.hadoop.tools.DistCp.run(DistCp.java:118)
> at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
> at org.apache.hadoop.tools.DistCp.main(DistCp.java:374)
>
> I tried doing start-all.sh , start-dfs.sh  and start-yarn.sh
>
> what should I do ?
> Thanks
> -roni
>
>


Re: Managing permissions when saving as text file

2015-03-05 Thread Akhil Das
Why not setup HDFS?

Thanks
Best Regards

On Thu, Mar 5, 2015 at 4:03 PM, didmar  wrote:

> Hi,
>
> I'm having a problem involving file permissions on the local filesystem.
>
> On a first machine, I have two different users :
> - launcher, which launches my job from an uber jar file
> - spark, which runs the master
> On a second machine, I have a user spark (same uid/gid as the other) which
> runs the worker
>
> Results are written in a shared folder /home/spark/output/ (same path for
> both machines) which is owned by spark and has 777 permissions
>
> When I run a job that saves a text file to /home/spark/output/result/,
> launcher creates the result folder and subfolders with 775 permissions.
> The problem is that the worker, which runs has user spark, cannot write the
> results :
>
>
>
> I tried to add the sticky bit to /home/spark/output/, but this did not
> suffice : it creates a part-0, starts filling it but then the following
> error occurs
>
>
>
> Is there a way to force Spark (the driver part I suppose ?) to create all
> these folders with 777 permissions instead of 775 ? Or maybe there is
> another way ?
>
> Thanks,
> Didier
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Managing-permissions-when-saving-as-text-file-tp21928.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


spark-stream programme failed on yarn-client

2015-03-05 Thread fenghaixiong
Hi all, 

   
I'm try to write a spark stream programme so i read the spark online document 
,according the document i write the programe like this :

import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._

object SparkStreamTest {
  def main(args: Array[String]) {
val conf = new SparkConf()
val ssc = new StreamingContext(conf, Seconds(1))
val lines = ssc.socketTextStream(args(0), args(1).toInt)
val words = lines.flatMap(_.split(" ")) 
val pairs = words.map(word => (word, 1)) 
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print() 
ssc.start() // Start the computation
ssc.awaitTermination()  // Wait for the computation to terminate
  }

}



for test i first start listen a port by this:
 nc -lk  

and then i submit job by
spark-submit  --master local[2] --class com.nd.hxf.SparkStreamTest 
spark-test-tream-1.0-SNAPSHOT-job.jar  localhost 

everything is okay


but when i run it on yarn by this :
spark-submit  --master yarn-client --class com.nd.hxf.SparkStreamTest 
spark-test-tream-1.0-SNAPSHOT-job.jar  localhost 

it wait for a longtime and repeat output somemessage a apart of the output is 
like this:  







15/03/06 15:30:24 INFO YarnClientSchedulerBackend: SchedulerBackend is ready 
for scheduling beginning after waiting maxRegisteredResourcesWaitingTime: 
3(ms)
15/03/06 15:30:24 INFO ReceiverTracker: ReceiverTracker started
15/03/06 15:30:24 INFO ForEachDStream: metadataCleanupDelay = -1
15/03/06 15:30:24 INFO ShuffledDStream: metadataCleanupDelay = -1
15/03/06 15:30:24 INFO MappedDStream: metadataCleanupDelay = -1
15/03/06 15:30:24 INFO FlatMappedDStream: metadataCleanupDelay = -1
15/03/06 15:30:24 INFO SocketInputDStream: metadataCleanupDelay = -1
15/03/06 15:30:24 INFO SocketInputDStream: Slide time = 1000 ms
15/03/06 15:30:24 INFO SocketInputDStream: Storage level = StorageLevel(false, 
false, false, false, 1)
15/03/06 15:30:24 INFO SocketInputDStream: Checkpoint interval = null
15/03/06 15:30:24 INFO SocketInputDStream: Remember duration = 1000 ms
15/03/06 15:30:24 INFO SocketInputDStream: Initialized and validated 
org.apache.spark.streaming.dstream.SocketInputDStream@b01c5f8
15/03/06 15:30:24 INFO FlatMappedDStream: Slide time = 1000 ms
15/03/06 15:30:24 INFO FlatMappedDStream: Storage level = StorageLevel(false, 
false, false, false, 1)
15/03/06 15:30:24 INFO FlatMappedDStream: Checkpoint interval = null
15/03/06 15:30:24 INFO FlatMappedDStream: Remember duration = 1000 ms
15/03/06 15:30:24 INFO FlatMappedDStream: Initialized and validated 
org.apache.spark.streaming.dstream.FlatMappedDStream@6bd47453
15/03/06 15:30:24 INFO MappedDStream: Slide time = 1000 ms
15/03/06 15:30:24 INFO MappedDStream: Storage level = StorageLevel(false, 
false, false, false, 1)
15/03/06 15:30:24 INFO MappedDStream: Checkpoint interval = null
15/03/06 15:30:24 INFO MappedDStream: Remember duration = 1000 ms
15/03/06 15:30:24 INFO MappedDStream: Initialized and validated 
org.apache.spark.streaming.dstream.MappedDStream@941451f
15/03/06 15:30:24 INFO ShuffledDStream: Slide time = 1000 ms
15/03/06 15:30:24 INFO ShuffledDStream: Storage level = StorageLevel(false, 
false, false, false, 1)
15/03/06 15:30:24 INFO ShuffledDStream: Checkpoint interval = null
15/03/06 15:30:24 INFO ShuffledDStream: Remember duration = 1000 ms
15/03/06 15:30:24 INFO ShuffledDStream: Initialized and validated 
org.apache.spark.streaming.dstream.ShuffledDStream@42eba6ee
15/03/06 15:30:24 INFO ForEachDStream: Slide time = 1000 ms
15/03/06 15:30:24 INFO ForEachDStream: Storage level = StorageLevel(false, 
false, false, false, 1)
15/03/06 15:30:24 INFO ForEachDStream: Checkpoint interval = null
15/03/06 15:30:24 INFO ForEachDStream: Remember duration = 1000 ms
15/03/06 15:30:24 INFO ForEachDStream: Initialized and validated 
org.apache.spark.streaming.dstream.ForEachDStream@48d166b5
15/03/06 15:30:24 INFO SparkContext: Starting job: start at 
SparkStreamTest.scala:21
15/03/06 15:30:24 INFO RecurringTimer: Started timer for JobGenerator at time 
1425627025000
15/03/06 15:30:24 INFO JobGenerator: Started JobGenerator at 1425627025000 ms
15/03/06 15:30:24 INFO JobScheduler: Started JobScheduler
15/03/06 15:30:24 INFO DAGScheduler: Registering RDD 2 (start at 
SparkStreamTest.scala:21)
15/03/06 15:30:24 INFO DAGScheduler: Got job 0 (start at 
SparkStreamTest.scala:21) with 20 output partitions (allowLocal=false)
15/03/06 15:30:24 INFO DAGScheduler: Final stage: Stage 0(start at 
SparkStreamTest.scala:21)
15/03/06 15:30:24 INFO DAGScheduler: Parents of final stage: List(Stage 1)
15/03/06 15:30:24 INFO DAGScheduler: Missing parents: List(Stage 1)
15/03/06 15:30:24 INFO DAGScheduler: S

Compile Spark with Maven & Zinc Scala Plugin

2015-03-05 Thread Night Wolf
Hey,

Trying to build latest spark 1.3 with Maven using

-DskipTests clean install package

But I'm getting errors with zinc, in the logs I see;

[INFO]
*--- scala-maven-plugin:3.2.0:compile (scala-compile-first) @
spark-network-common_2.11 --- *

...

[error] Required file not found: sbt-interface.jar
[error] See zinc -help for information about locating necessary files


Any ideas?


??????Compile Spark with Maven & Zinc Scala Plugin

2015-03-05 Thread ??
try it with mvn  -DskipTests -Pscala-2.11 clean install package

Re: Managing permissions when saving as text file

2015-03-05 Thread didmar
Ok, I solved this problem by :
- changing the primary group of launcher to spark
- adding "umask 002" in launcher's .bashrc and spark's init.d script



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Managing-permissions-when-saving-as-text-file-tp21928p21943.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org