Re: Unable to start Spark 1.3 after building:java.lang. NoClassDefFoundError: org/codehaus/jackson/map/deser/std/StdDeserializer

2014-12-18 Thread Sean Owen
Adding a hadoop-2.6 profile is not necessary. Use hadoop-2.4, which
already exists and is intended for 2.4+. In fact this declaration is
missing things that Hadoop 2 needs.

On Thu, Dec 18, 2014 at 3:46 AM, Kyle Lin  wrote:
> Hi there
>
> The following is my steps. And got the same exception with Daniel's.
> Another question: how can I build a tgz file like the pre-build file I
> download from official website?
>
> 1. download trunk from git.
>
> 2. add following lines in pom.xml
> + 
> +  hadoop-2.6
> +  
> +2.6.0
> +2.5.0
> +0.9.0
> +  
> +
>
> 3. run "mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0 -DskipTests clean
> package"
>
> 4. in $SPARK_HOME, run following command
> ./bin/spark-submit --master yarn-cluster --class
> org.apache.spark.examples.SparkPi lib/spark-examples*.jar 10
>
> Kyle
>
>
> 2014-12-18 2:24 GMT+08:00 Daniel Haviv :
>>
>> Thanks for your replies.
>> I was building spark from trunk.
>>
>> Daniel
>>
>> On 17 בדצמ׳ 2014, at 19:49, Nicholas Chammas 
>> wrote:
>>
>> Thanks for the correction, Sean. Do the docs need to be updated on this
>> point, or is it safer for now just to note 2.4 specifically?
>>
>> On Wed Dec 17 2014 at 5:54:53 AM Sean Owen  wrote:
>>>
>>> Spark works fine with 2.4 *and later*. The docs don't mean to imply
>>> 2.4 is the last supported version.
>>>
>>> On Wed, Dec 17, 2014 at 10:19 AM, Nicholas Chammas
>>>  wrote:
>>> > Spark 1.3 does not exist. Spark 1.2 hasn't been released just yet.
>>> > Which
>>> > version of Spark did you mean?
>>> >
>>> > Also, from what I can see in the docs, I believe the latest version of
>>> > Hadoop that Spark supports is 2.4, not 2.6.
>>> >
>>> > Nick
>>> >
>>> > On Wed Dec 17 2014 at 2:09:56 AM Kyle Lin 
>>> > wrote:
>>> >>
>>> >>
>>> >> I also got the same problem..
>>> >>
>>> >> 2014-12-09 22:58 GMT+08:00 Daniel Haviv :
>>> >>>
>>> >>> Hi,
>>> >>> I've built spark 1.3 with hadoop 2.6 but when I startup the
>>> >>> spark-shell I
>>> >>> get the following exception:
>>> >>>
>>> >>> 14/12/09 06:54:24 INFO server.AbstractConnector: Started
>>> >>> SelectChannelConnector@0.0.0.0:4040
>>> >>> 14/12/09 06:54:24 INFO util.Utils: Successfully started service
>>> >>> 'SparkUI'
>>> >>> on port 4040.
>>> >>> 14/12/09 06:54:24 INFO ui.SparkUI: Started SparkUI at
>>> >>> http://hdname:4040
>>> >>> 14/12/09 06:54:25 INFO impl.TimelineClientImpl: Timeline service
>>> >>> address:
>>> >>> http://0.0.0.0:8188/ws/v1/timeline/
>>> >>> java.lang.NoClassDefFoundError:
>>> >>> org/codehaus/jackson/map/deser/std/StdDeserializer
>>> >>> at java.lang.ClassLoader.defineClass1(Native Method)
>>> >>> at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
>>> >>> at
>>> >>>
>>> >>> java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
>>> >>>
>>> >>> Any idea why ?
>>> >>>
>>> >>> Thanks,
>>> >>> Daniel
>>> >>>
>>> >>>
>>> >

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Can Spark 1.0.2 run on CDH-4.3.0 with yarn? And Will Spark 1.2.0 support CDH5.1.2 with yarn?

2014-12-18 Thread Canoe
I did not compile spark 1.1.0 source code on CDH4.3.0 with yarn successfully. 
Does it support CDH4.3.0 with yarn ? 
And will spark 1.2.0 support CDH5.1.2?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Can-Spark-1-0-2-run-on-CDH-4-3-0-with-yarn-And-Will-Spark-1-2-0-support-CDH5-1-2-with-yarn-tp20760.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: weird bytecode incompatability issue between spark-core jar from mvn repo and official spark prebuilt binary

2014-12-18 Thread Sean Owen
Well, it's always a good idea to used matched binary versions. Here it
is more acutely necessary. You can use a pre built binary -- if you
use it to compile and also run. Why does it not make sense to publish
artifacts?

Not sure what you mean about core vs assembly, as the assembly
contains all of the modules. You don't literally need the same jar
file.

On Thu, Dec 18, 2014 at 3:20 AM, Sun, Rui  wrote:
> Not using spark-submit. The App directly communicates with the Spark cluster
> in standalone mode.
>
>
>
> If mark the Spark dependency as 'provided’, then the spark-core .jar
> elsewhere must be pointe to in CLASSPATH. However, the pre-built Spark
> binary only has an assembly jar, not having individual module jars. So you
> don’t have a chance to point to a module.jar which is the same binary as
> that in the pre-built Spark binary.
>
>
>
> Maybe the Spark distribution should contain not only the assembly jar but
> also individual module jars. Any opinion?
>
>
>
> From: Shivaram Venkataraman [mailto:shiva...@eecs.berkeley.edu]
> Sent: Thursday, December 18, 2014 2:20 AM
> To: Sean Owen
> Cc: Sun, Rui; user@spark.apache.org
> Subject: Re: weird bytecode incompatability issue between spark-core jar
> from mvn repo and official spark prebuilt binary
>
>
>
> Just to clarify, are you running the application using spark-submit after
> packaging with sbt package ? One thing that might help is to mark the Spark
> dependency as 'provided' as then you shouldn't have the Spark classes in
> your jar.
>
>
>
> Thanks
>
> Shivaram
>
>
>
> On Wed, Dec 17, 2014 at 4:39 AM, Sean Owen  wrote:
>
> You should use the same binaries everywhere. The problem here is that
> anonymous functions get compiled to different names when you build
> different (potentially) so you actually have one function being called
> when another function is meant.
>
>
> On Wed, Dec 17, 2014 at 12:07 PM, Sun, Rui  wrote:
>> Hi,
>>
>>
>>
>> I encountered a weird bytecode incompatability issue between spark-core
>> jar
>> from mvn repo and official spark prebuilt binary.
>>
>>
>>
>> Steps to reproduce:
>>
>> 1. Download the official pre-built Spark binary 1.1.1 at
>> http://d3kbcqa49mib13.cloudfront.net/spark-1.1.1-bin-hadoop1.tgz
>>
>> 2. Launch the Spark cluster in pseudo cluster mode
>>
>> 3. A small scala APP which calls RDD.saveAsObjectFile()
>>
>> scalaVersion := "2.10.4"
>>
>>
>>
>> libraryDependencies ++= Seq(
>>
>>   "org.apache.spark" %% "spark-core" % "1.1.1"
>>
>> )
>>
>>
>>
>> val sc = new SparkContext(args(0), "test") //args[0] is the Spark master
>> URI
>>
>>   val rdd = sc.parallelize(List(1, 2, 3))
>>
>>   rdd.saveAsObjectFile("/tmp/mysaoftmp")
>>
>>   sc.stop
>>
>>
>>
>> throws an exception as follows:
>>
>> [error] (run-main-0) org.apache.spark.SparkException: Job aborted due to
>> stage failure: Task 1 in stage 0.0 failed 4 times, most recent failure:
>> Lost
>> task 1.3 in stage 0.0 (TID 6, ray-desktop.sh.intel.com):
>> java.lang.ClassCastException: scala.Tuple2 cannot be cast to
>> scala.collection.Iterator
>>
>> [error] org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596)
>>
>> [error] org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596)
>>
>> [error]
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>
>> [error]
>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>>
>> [error] org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>>
>> [error] org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>>
>> [error]
>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>>
>> [error] org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>>
>> [error]
>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>>
>> [error] org.apache.spark.scheduler.Task.run(Task.scala:54)
>>
>> [error]
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
>>
>> [error]
>>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146)
>>
>> [error]
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>
>> [error] java.lang.Thread.run(Thread.java:701)
>>
>>
>>
>> After investigation, I found that this is caused by bytecode
>> incompatibility
>> issue between RDD.class in spark-core_2.10-1.1.1.jar and the pre-built
>> spark
>> assembly respectively.
>>
>>
>>
>> This issue also happens with spark 1.1.0.
>>
>>
>>
>> Is there anything wrong in my usage of Spark? Or anything wrong in the
>> process of deploying Spark module jars to maven repo?
>>
>>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: weird bytecode incompatability issue between spark-core jar from mvn repo and official spark prebuilt binary

2014-12-18 Thread Shixiong Zhu
@Rui do you mean the spark-core jar in the maven central repo
are incompatible with the same version of the the official pre-built Spark
binary? That's really weird. I thought they should have used the same codes.

Best Regards,
Shixiong Zhu

2014-12-18 17:22 GMT+08:00 Sean Owen :
>
> Well, it's always a good idea to used matched binary versions. Here it
> is more acutely necessary. You can use a pre built binary -- if you
> use it to compile and also run. Why does it not make sense to publish
> artifacts?
>
> Not sure what you mean about core vs assembly, as the assembly
> contains all of the modules. You don't literally need the same jar
> file.
>
> On Thu, Dec 18, 2014 at 3:20 AM, Sun, Rui  wrote:
> > Not using spark-submit. The App directly communicates with the Spark
> cluster
> > in standalone mode.
> >
> >
> >
> > If mark the Spark dependency as 'provided’, then the spark-core .jar
> > elsewhere must be pointe to in CLASSPATH. However, the pre-built Spark
> > binary only has an assembly jar, not having individual module jars. So
> you
> > don’t have a chance to point to a module.jar which is the same binary as
> > that in the pre-built Spark binary.
> >
> >
> >
> > Maybe the Spark distribution should contain not only the assembly jar but
> > also individual module jars. Any opinion?
> >
> >
> >
> > From: Shivaram Venkataraman [mailto:shiva...@eecs.berkeley.edu]
> > Sent: Thursday, December 18, 2014 2:20 AM
> > To: Sean Owen
> > Cc: Sun, Rui; user@spark.apache.org
> > Subject: Re: weird bytecode incompatability issue between spark-core jar
> > from mvn repo and official spark prebuilt binary
> >
> >
> >
> > Just to clarify, are you running the application using spark-submit after
> > packaging with sbt package ? One thing that might help is to mark the
> Spark
> > dependency as 'provided' as then you shouldn't have the Spark classes in
> > your jar.
> >
> >
> >
> > Thanks
> >
> > Shivaram
> >
> >
> >
> > On Wed, Dec 17, 2014 at 4:39 AM, Sean Owen  wrote:
> >
> > You should use the same binaries everywhere. The problem here is that
> > anonymous functions get compiled to different names when you build
> > different (potentially) so you actually have one function being called
> > when another function is meant.
> >
> >
> > On Wed, Dec 17, 2014 at 12:07 PM, Sun, Rui  wrote:
> >> Hi,
> >>
> >>
> >>
> >> I encountered a weird bytecode incompatability issue between spark-core
> >> jar
> >> from mvn repo and official spark prebuilt binary.
> >>
> >>
> >>
> >> Steps to reproduce:
> >>
> >> 1. Download the official pre-built Spark binary 1.1.1 at
> >> http://d3kbcqa49mib13.cloudfront.net/spark-1.1.1-bin-hadoop1.tgz
> >>
> >> 2. Launch the Spark cluster in pseudo cluster mode
> >>
> >> 3. A small scala APP which calls RDD.saveAsObjectFile()
> >>
> >> scalaVersion := "2.10.4"
> >>
> >>
> >>
> >> libraryDependencies ++= Seq(
> >>
> >>   "org.apache.spark" %% "spark-core" % "1.1.1"
> >>
> >> )
> >>
> >>
> >>
> >> val sc = new SparkContext(args(0), "test") //args[0] is the Spark master
> >> URI
> >>
> >>   val rdd = sc.parallelize(List(1, 2, 3))
> >>
> >>   rdd.saveAsObjectFile("/tmp/mysaoftmp")
> >>
> >>   sc.stop
> >>
> >>
> >>
> >> throws an exception as follows:
> >>
> >> [error] (run-main-0) org.apache.spark.SparkException: Job aborted due to
> >> stage failure: Task 1 in stage 0.0 failed 4 times, most recent failure:
> >> Lost
> >> task 1.3 in stage 0.0 (TID 6, ray-desktop.sh.intel.com):
> >> java.lang.ClassCastException: scala.Tuple2 cannot be cast to
> >> scala.collection.Iterator
> >>
> >> [error]
>  org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596)
> >>
> >> [error]
>  org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596)
> >>
> >> [error]
> >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> >>
> >> [error]
> >> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
> >>
> >> [error] org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
> >>
> >> [error]
>  org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
> >>
> >> [error]
> >> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
> >>
> >> [error] org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
> >>
> >> [error]
> >> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
> >>
> >> [error] org.apache.spark.scheduler.Task.run(Task.scala:54)
> >>
> >> [error]
> >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
> >>
> >> [error]
> >>
> >>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146)
> >>
> >> [error]
> >>
> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> >>
> >> [error] java.lang.Thread.run(Thread.java:701)
> >>
> >>
> >>
> >> After investigation, I found that this is caused by bytecode
> >> incompatibility
> >> issue between RDD.class in spark-core_2.10-1.1.1.jar and the pre-built
> >> spark
> >> assembly respectively.
> >>
> >>
> >>

Re: Can Spark 1.0.2 run on CDH-4.3.0 with yarn? And Will Spark 1.2.0 support CDH5.1.2 with yarn?

2014-12-18 Thread Sean Owen
The question is really: will Spark 1.1 work with a particular version
of YARN? many, but not all versions of YARN are supported. The
"stable" versions are (2.2.x+). Before that, support is patchier, and
in fact has been removed in Spark 1.3.

The "yarn" profile supports "YARN stable" which is about 2.2.x and
onwards. The "yarn-alpha" profile should work for YARN about 0.23.x.
2.0.x and 2.1.x were a sort of "beta" period and I recall that
yarn-alpha works with some of it, but not all, and there is no
yarn-beta profile.

I believe early CDH 4.x has basically "YARN beta". Later 4.x has
stable. I think I'd try the yarn-alpha profile and see if it compiles.
But the version of YARN in that release may well be among those that
fall in the gap between "alpha" and "stable" support.

Thankfully things got a lot more stable past Hadoop / YARN 2.2 or so,
so it far more just works without version issues. And CDH 5 is based
on Hadoop 2.3 and then 2.5, so you should be much more able to build
whatever versions together that you want.

CDH 5.1.x ships Spark 1.0.x. There should be no problem using 1.1.x,
1.2.x, etc. with it; you just need to make and support your own
binaries. 5.2.x has 1.1.x; 5.3.x will have 1.2.x.

On Thu, Dec 18, 2014 at 9:18 AM, Canoe  wrote:
> I did not compile spark 1.1.0 source code on CDH4.3.0 with yarn successfully.
> Does it support CDH4.3.0 with yarn ?
> And will spark 1.2.0 support CDH5.1.2?
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Can-Spark-1-0-2-run-on-CDH-4-3-0-with-yarn-And-Will-Spark-1-2-0-support-CDH5-1-2-with-yarn-tp20760.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Semantics of foreachPartition()

2014-12-18 Thread Tobias Pfeiffer
Hi,

I have the following code in my application:

tmpRdd.foreach(item => {
  println("abc: " + item)
})
tmpRdd.foreachPartition(iter => {
  iter.map(item => {
println("xyz: " + item)
  })
})

In the output, I see only the "abc" prints (i.e. from the foreach() call).
(The result is the same also if I exchange the order.) What exactly is the
meaning of foreachPartition and how would I use it correctly?

Thanks
Tobias


Re: Can Spark 1.0.2 run on CDH-4.3.0 with yarn? And Will Spark 1.2.0 support CDH5.1.2 with yarn?

2014-12-18 Thread Zhihang Fan
Hi, Sean
   Thank you for your reply. I will try to use Spark 1.1 and 1.2 on CHD5.X.
 :)


2014-12-18 17:38 GMT+08:00 Sean Owen :
>
> The question is really: will Spark 1.1 work with a particular version
> of YARN? many, but not all versions of YARN are supported. The
> "stable" versions are (2.2.x+). Before that, support is patchier, and
> in fact has been removed in Spark 1.3.
>
> The "yarn" profile supports "YARN stable" which is about 2.2.x and
> onwards. The "yarn-alpha" profile should work for YARN about 0.23.x.
> 2.0.x and 2.1.x were a sort of "beta" period and I recall that
> yarn-alpha works with some of it, but not all, and there is no
> yarn-beta profile.
>
> I believe early CDH 4.x has basically "YARN beta". Later 4.x has
> stable. I think I'd try the yarn-alpha profile and see if it compiles.
> But the version of YARN in that release may well be among those that
> fall in the gap between "alpha" and "stable" support.
>
> Thankfully things got a lot more stable past Hadoop / YARN 2.2 or so,
> so it far more just works without version issues. And CDH 5 is based
> on Hadoop 2.3 and then 2.5, so you should be much more able to build
> whatever versions together that you want.
>
> CDH 5.1.x ships Spark 1.0.x. There should be no problem using 1.1.x,
> 1.2.x, etc. with it; you just need to make and support your own
> binaries. 5.2.x has 1.1.x; 5.3.x will have 1.2.x.
>
> On Thu, Dec 18, 2014 at 9:18 AM, Canoe  wrote:
> > I did not compile spark 1.1.0 source code on CDH4.3.0 with yarn
> successfully.
> > Does it support CDH4.3.0 with yarn ?
> > And will spark 1.2.0 support CDH5.1.2?
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Can-Spark-1-0-2-run-on-CDH-4-3-0-with-yarn-And-Will-Spark-1-2-0-support-CDH5-1-2-with-yarn-tp20760.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>


-- 
谁谓河广,一苇航之


Re: weird bytecode incompatability issue between spark-core jar from mvn repo and official spark prebuilt binary

2014-12-18 Thread Sean Owen
Have a look at https://issues.apache.org/jira/browse/SPARK-2075

It's not quite that the API is different, but indeed building
different 'flavors' of the same version (hadoop1 vs 2) can strangely
lead to this problem, even though the public API is identical and in
theory the API is completely separate from the backend bindings.

IIRC the idea is that only submitting via spark-submit is really
supported, because there you're definitely running exactly what's on
your cluster. That should always work.

This sort of gotcha turns up in some specific cases but you can always
work around it by matching your embedded Spark version as well.

On Thu, Dec 18, 2014 at 9:38 AM, Shixiong Zhu  wrote:
> @Rui do you mean the spark-core jar in the maven central repo are
> incompatible with the same version of the the official pre-built Spark
> binary? That's really weird. I thought they should have used the same codes.
>
> Best Regards,
>
> Shixiong Zhu
>
> 2014-12-18 17:22 GMT+08:00 Sean Owen :
>>
>> Well, it's always a good idea to used matched binary versions. Here it
>> is more acutely necessary. You can use a pre built binary -- if you
>> use it to compile and also run. Why does it not make sense to publish
>> artifacts?
>>
>> Not sure what you mean about core vs assembly, as the assembly
>> contains all of the modules. You don't literally need the same jar
>> file.
>>
>> On Thu, Dec 18, 2014 at 3:20 AM, Sun, Rui  wrote:
>> > Not using spark-submit. The App directly communicates with the Spark
>> > cluster
>> > in standalone mode.
>> >
>> >
>> >
>> > If mark the Spark dependency as 'provided’, then the spark-core .jar
>> > elsewhere must be pointe to in CLASSPATH. However, the pre-built Spark
>> > binary only has an assembly jar, not having individual module jars. So
>> > you
>> > don’t have a chance to point to a module.jar which is the same binary as
>> > that in the pre-built Spark binary.
>> >
>> >
>> >
>> > Maybe the Spark distribution should contain not only the assembly jar
>> > but
>> > also individual module jars. Any opinion?
>> >
>> >
>> >
>> > From: Shivaram Venkataraman [mailto:shiva...@eecs.berkeley.edu]
>> > Sent: Thursday, December 18, 2014 2:20 AM
>> > To: Sean Owen
>> > Cc: Sun, Rui; user@spark.apache.org
>> > Subject: Re: weird bytecode incompatability issue between spark-core jar
>> > from mvn repo and official spark prebuilt binary
>> >
>> >
>> >
>> > Just to clarify, are you running the application using spark-submit
>> > after
>> > packaging with sbt package ? One thing that might help is to mark the
>> > Spark
>> > dependency as 'provided' as then you shouldn't have the Spark classes in
>> > your jar.
>> >
>> >
>> >
>> > Thanks
>> >
>> > Shivaram
>> >
>> >
>> >
>> > On Wed, Dec 17, 2014 at 4:39 AM, Sean Owen  wrote:
>> >
>> > You should use the same binaries everywhere. The problem here is that
>> > anonymous functions get compiled to different names when you build
>> > different (potentially) so you actually have one function being called
>> > when another function is meant.
>> >
>> >
>> > On Wed, Dec 17, 2014 at 12:07 PM, Sun, Rui  wrote:
>> >> Hi,
>> >>
>> >>
>> >>
>> >> I encountered a weird bytecode incompatability issue between spark-core
>> >> jar
>> >> from mvn repo and official spark prebuilt binary.
>> >>
>> >>
>> >>
>> >> Steps to reproduce:
>> >>
>> >> 1. Download the official pre-built Spark binary 1.1.1 at
>> >> http://d3kbcqa49mib13.cloudfront.net/spark-1.1.1-bin-hadoop1.tgz
>> >>
>> >> 2. Launch the Spark cluster in pseudo cluster mode
>> >>
>> >> 3. A small scala APP which calls RDD.saveAsObjectFile()
>> >>
>> >> scalaVersion := "2.10.4"
>> >>
>> >>
>> >>
>> >> libraryDependencies ++= Seq(
>> >>
>> >>   "org.apache.spark" %% "spark-core" % "1.1.1"
>> >>
>> >> )
>> >>
>> >>
>> >>
>> >> val sc = new SparkContext(args(0), "test") //args[0] is the Spark
>> >> master
>> >> URI
>> >>
>> >>   val rdd = sc.parallelize(List(1, 2, 3))
>> >>
>> >>   rdd.saveAsObjectFile("/tmp/mysaoftmp")
>> >>
>> >>   sc.stop
>> >>
>> >>
>> >>
>> >> throws an exception as follows:
>> >>
>> >> [error] (run-main-0) org.apache.spark.SparkException: Job aborted due
>> >> to
>> >> stage failure: Task 1 in stage 0.0 failed 4 times, most recent failure:
>> >> Lost
>> >> task 1.3 in stage 0.0 (TID 6, ray-desktop.sh.intel.com):
>> >> java.lang.ClassCastException: scala.Tuple2 cannot be cast to
>> >> scala.collection.Iterator
>> >>
>> >> [error]
>> >> org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596)
>> >>
>> >> [error]
>> >> org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596)
>> >>
>> >> [error]
>> >>
>> >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>> >>
>> >> [error]
>> >> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>> >>
>> >> [error] org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>> >>
>> >> [error]
>> >> org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>> >>
>> >> [error]
>> >> org.apache.spark.rdd.RDD.compu

Re: Semantics of foreachPartition()

2014-12-18 Thread Tobias Pfeiffer
Hi again,

On Thu, Dec 18, 2014 at 6:43 PM, Tobias Pfeiffer  wrote:
>
> tmpRdd.foreachPartition(iter => {
>   iter.map(item => {
> println("xyz: " + item)
>   })
> })
>

Uh, with iter.foreach(...) it works... the reason being apparently that
iter.map() returns itself an iterator, is thus evaluated lazily (in this
case: never), while iter.foreach() is evaluated immediately.

Thanks
Tobias


Re: java.io.NotSerializableException: org.apache.avro.mapred.AvroKey using spark with avro

2014-12-18 Thread M. Dale
I did not encounter this with my Avro records using Spark 1.10 (see 
https://github.com/medale/spark-mail/blob/master/analytics/src/main/scala/com/uebercomputing/analytics/basic/UniqueSenderCounter.scala). 



I do use the default Java serialization but all the fields in my Avro 
object are Serializable (no bytes/ByteBuffer). Does your Avro schema use 
bytes? If so, it seems that is wrapped in ByteBuffer, which is not 
Serializable. A quick search has a fix here:


https://groups.google.com/forum/#!topic/spark-users/6HQPuxsCe0c

Hope this helps,
Markus

On 12/17/2014 08:14 PM, touchdown wrote:

Yeah, I have the same problem with 1.1.0, but not 1.0.0.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/java-io-NotSerializableException-org-apache-avro-mapred-AvroKey-using-spark-with-avro-tp15165p20752.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Providing query dsl to Elasticsearch for Spark (2.1.0.Beta3)

2014-12-18 Thread Ian Wilkinson
Quick follow-up: this works sweetly with spark-1.1.1-bin-hadoop2.4.


> On Dec 3, 2014, at 3:31 PM, Ian Wilkinson  wrote:
> 
> Hi,
> 
> I'm trying the Elasticsearch support for Spark (2.1.0.Beta3).
> 
> In the following I provide the query (as query dsl):
> 
> 
> import org.elasticsearch.spark._
> 
> object TryES {
>  val sparkConf = new SparkConf().setAppName("Campaigns")
>  sparkConf.set("es.nodes", ":9200")
>  sparkConf.set("es.nodes.discovery", "false")
>  val sc = new SparkContext(sparkConf)
> 
>  def main(args: Array[String]) {
>val query = {
>   "query": {
>  ...
>   }
> }
> """
>val campaigns = sc.esRDD("", query)
>campaigns.count();
>  }
> }
> 
> 
> However when I submit this (using spark-1.1.0-bin-hadoop2.4),
> I am experiencing the following exceptions:
> 
> 14/12/03 14:55:27 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, 
> whose tasks have all completed, from pool
> 14/12/03 14:55:27 INFO scheduler.DAGScheduler: Failed to run count at 
> TryES.scala:<...>
> Exception in thread "main" org.apache.spark.SparkException: Job aborted due 
> to stage failure: Task 1 in stage 0.0 failed 1 times, most recent failure: 
> Lost task 1.0 in stage 0.0 (TID 1, localhost): 
> org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Cannot open stream 
> for resource "{
>   "query": {
>   ...
>   }
> }
> 
> 
> Is the query dsl supported with esRDD, or am I missing something
> more fundamental?
> 
> Huge thanks,
> ian
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SPARK-2243 Support multiple SparkContexts in the same JVM

2014-12-18 Thread Sean Owen
Yes, although once you have multiple ClassLoaders, you are operating
as if in multiple JVMs for most intents and purposes. I think the
request for this kind of functionality comes from use cases where
multiple ClassLoaders wouldn't work, like, wanting to have one app (in
one ClassLoader) managing multiple contexts.

On Thu, Dec 18, 2014 at 2:23 AM, Anton Brazhnyk
 wrote:
> Greetings,
>
>
>
> First comment on the issue says that reason for non-supporting of multiple
> contexts is
> “There are numerous assumptions in the code base that uses a shared cache or
> thread local variables or some global identifiers
> which prevent us from using multiple SparkContext's.”
>
>
>
> May it be worked around by creating those context in several classloaders
> with their own copies of Spark classes?
>
>
>
> Thanks,
>
> Anton

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Incorrect results when calling collect() ?

2014-12-18 Thread Tristan Blakers
Hi,

I’m getting some seemingly invalid results when I collect an RDD. This is
happening in both Spark 1.1.0 and 1.2.0, using Java8 on Mac.

See the following code snippet:

JavaRDD rdd= pairRDD.values();
rdd.foreach( e -> System.out.println ( "RDD Foreach: " + e ) );
rdd.collect().forEach( e -> System.out.println ( "Collected Foreach: " + e
) );

I would expect the results from the two outputters to be identical, but
instead I see:

RDD Foreach: Thing1
RDD Foreach: Thing2
RDD Foreach: Thing3
RDD Foreach: Thing4
(…snip…)
Collected Foreach: Thing1
Collected Foreach: Thing1
Collected Foreach: Thing1
Collected Foreach: Thing2

So essentially the valid entries except for one are replaced by an
equivalent number of duplicate objects. I’ve tried various map and filter
operations, but the results in the RDD always appear correct until I try to
collect() the results. I’ve also found that calling cache() on the RDD
materialises the duplication process such that the RDD Foreach displays the
duplicates too...

Any suggestions for how I can go about debugging this would be massively
appreciated.

Cheers
Tristan


Can we specify driver running on a specific machine of the cluster on yarn-cluster mode?

2014-12-18 Thread LinQili
Hi all,On yarn-cluster mode, can we let the driver running on a specific 
machine that we choose in cluster ? Or, even the machine not in the cluster?
 

Re: Implementing a spark version of Haskell's partition

2014-12-18 Thread andy petrella
NP man,

The thing is that since you're in a dist env, it'd be cumbersome to do
that. Remember that Spark works basically on block/partition, they are the
unit of distribution and parallelization.
That means that actions have to be run against it **after having been
scheduled on the cluster**.
The latter point is the most important, it means that the RDD aren't
"really" created on the driver the collection is created/transformed/... on
the partition.
Consequence of what you cannot, on the driver, create such representation
on the distributed collection since you haven't seen it yet.
That being said, you can only prepare/define some computations on the
driver that will segregate the data by applying a filter on the nodes.
If you want to keep RDD operators as they are, yes you'll need to pass over
the distributed data twice.

The option of using the mapPartitions for instance, will be to create a
RDD[Seq[A], Seq[A]] however it's going to be tricky because you'll might
have to repartition otherwise the OOMs might blow at your face :-D.
I won't pick that one!


A final note: looping over the data is not that a problem (specially if you
can cache it), and in fact it's way better to keep advantage of resilience
etc etc that comes with Spark.

my2c
andy


On Wed Dec 17 2014 at 7:07:05 PM Juan Rodríguez Hortalá <
juan.rodriguez.hort...@gmail.com> wrote:

> Hi Andy,  thanks for your response. I already thought about filtering
> twice, that was what I meant with "that would be equivalent to applying
> filter twice", but I was thinking if I could do it in a single pass, so
> that could be later generalized to an arbitrary numbers of classes. I would
> also like to be able to generate RDDs instead of partitions of a single
> RDD, so I could use RDD methods like stats() on the fragments. But I think
> there is currently no RDD method that returns more than one RDD for a
> single input RDD, so maybe there is some design limitation on Spark that
> prevents this?
>
> Again, thanks for your answer.
>
> Greetings,
>
> Juan
> El 17/12/2014 18:15, "andy petrella"  escribió:
>
> yo,
>>
>> First, here is the scala version:
>> http://www.scala-lang.org/api/current/index.html#scala.collection.Seq@partition(p:A=
>> >Boolean):(Repr,Repr)
>>
>> Second: RDD is distributed so what you'll have to do is to partition each
>> partition each partition (:-D) or create two RDDs with by filtering twice →
>> hence tasks will be scheduled distinctly, and data read twice. Choose
>> what's best for you!
>>
>> hth,
>> andy
>>
>>
>> On Wed Dec 17 2014 at 5:57:56 PM Juan Rodríguez Hortalá <
>> juan.rodriguez.hort...@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>> I would like to be able to split a RDD in two pieces according to a
>>> predicate. That would be equivalent to applying filter twice, with the
>>> predicate and its complement, which is also similar to Haskell's partition
>>> list function (
>>> http://hackage.haskell.org/package/base-4.7.0.1/docs/Data-List.html).
>>> There is currently any way to do this in Spark?, or maybe anyone has a
>>> suggestion about how to implent this by modifying the Spark source. I think
>>> this is valuable because sometimes I need to split a RDD in several groups
>>> that are too big to fit in the memory of a single thread, so pair RDDs are
>>> not solution for those cases. A generalization to n parts of Haskell's
>>> partition would do the job.
>>>
>>> Thanks a lot for your help.
>>>
>>> Greetings,
>>>
>>> Juan Rodriguez
>>>
>>


Re: Incorrect results when calling collect() ?

2014-12-18 Thread Sean Owen
It sounds a lot like your values are mutable classes and you are
mutating or reusing them somewhere? It might work until you actually
try to materialize them all and find many point to the same object.

On Thu, Dec 18, 2014 at 10:06 AM, Tristan Blakers  wrote:
> Hi,
>
> I’m getting some seemingly invalid results when I collect an RDD. This is
> happening in both Spark 1.1.0 and 1.2.0, using Java8 on Mac.
>
> See the following code snippet:
>
> JavaRDD rdd= pairRDD.values();
> rdd.foreach( e -> System.out.println ( "RDD Foreach: " + e ) );
> rdd.collect().forEach( e -> System.out.println ( "Collected Foreach: " + e )
> );
>
> I would expect the results from the two outputters to be identical, but
> instead I see:
>
> RDD Foreach: Thing1
> RDD Foreach: Thing2
> RDD Foreach: Thing3
> RDD Foreach: Thing4
> (…snip…)
> Collected Foreach: Thing1
> Collected Foreach: Thing1
> Collected Foreach: Thing1
> Collected Foreach: Thing2
>
> So essentially the valid entries except for one are replaced by an
> equivalent number of duplicate objects. I’ve tried various map and filter
> operations, but the results in the RDD always appear correct until I try to
> collect() the results. I’ve also found that calling cache() on the RDD
> materialises the duplication process such that the RDD Foreach displays the
> duplicates too...
>
> Any suggestions for how I can go about debugging this would be massively
> appreciated.
>
> Cheers
> Tristan

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Implementing a spark version of Haskell's partition

2014-12-18 Thread Juan Rodríguez Hortalá
Hi Andy,

Thanks again for your thoughts on this, I haven't found much information
about the internals of Spark, so I find very useful and interesting these
kind of explanations about its low level mechanisms. It's also nice to know
that the two pass approach is a viable solution.

Regards,

Juan

2014-12-18 11:10 GMT+01:00 andy petrella :
>
> NP man,
>
> The thing is that since you're in a dist env, it'd be cumbersome to do
> that. Remember that Spark works basically on block/partition, they are the
> unit of distribution and parallelization.
> That means that actions have to be run against it **after having been
> scheduled on the cluster**.
> The latter point is the most important, it means that the RDD aren't
> "really" created on the driver the collection is created/transformed/... on
> the partition.
> Consequence of what you cannot, on the driver, create such representation
> on the distributed collection since you haven't seen it yet.
> That being said, you can only prepare/define some computations on the
> driver that will segregate the data by applying a filter on the nodes.
> If you want to keep RDD operators as they are, yes you'll need to pass
> over the distributed data twice.
>
> The option of using the mapPartitions for instance, will be to create a
> RDD[Seq[A], Seq[A]] however it's going to be tricky because you'll might
> have to repartition otherwise the OOMs might blow at your face :-D.
> I won't pick that one!
>
>
> A final note: looping over the data is not that a problem (specially if
> you can cache it), and in fact it's way better to keep advantage of
> resilience etc etc that comes with Spark.
>
> my2c
> andy
>
>
> On Wed Dec 17 2014 at 7:07:05 PM Juan Rodríguez Hortalá <
> juan.rodriguez.hort...@gmail.com> wrote:
>
>> Hi Andy,  thanks for your response. I already thought about filtering
>> twice, that was what I meant with "that would be equivalent to applying
>> filter twice", but I was thinking if I could do it in a single pass, so
>> that could be later generalized to an arbitrary numbers of classes. I would
>> also like to be able to generate RDDs instead of partitions of a single
>> RDD, so I could use RDD methods like stats() on the fragments. But I think
>> there is currently no RDD method that returns more than one RDD for a
>> single input RDD, so maybe there is some design limitation on Spark that
>> prevents this?
>>
>> Again, thanks for your answer.
>>
>> Greetings,
>>
>> Juan
>> El 17/12/2014 18:15, "andy petrella"  escribió:
>>
>> yo,
>>>
>>> First, here is the scala version:
>>> http://www.scala-lang.org/api/current/index.html#scala.collection.Seq@partition(p:A=
>>> >Boolean):(Repr,Repr)
>>>
>>> Second: RDD is distributed so what you'll have to do is to partition
>>> each partition each partition (:-D) or create two RDDs with by filtering
>>> twice → hence tasks will be scheduled distinctly, and data read twice.
>>> Choose what's best for you!
>>>
>>> hth,
>>> andy
>>>
>>>
>>> On Wed Dec 17 2014 at 5:57:56 PM Juan Rodríguez Hortalá <
>>> juan.rodriguez.hort...@gmail.com> wrote:
>>>
 Hi all,

 I would like to be able to split a RDD in two pieces according to a
 predicate. That would be equivalent to applying filter twice, with the
 predicate and its complement, which is also similar to Haskell's partition
 list function (
 http://hackage.haskell.org/package/base-4.7.0.1/docs/Data-List.html).
 There is currently any way to do this in Spark?, or maybe anyone has a
 suggestion about how to implent this by modifying the Spark source. I think
 this is valuable because sometimes I need to split a RDD in several groups
 that are too big to fit in the memory of a single thread, so pair RDDs are
 not solution for those cases. A generalization to n parts of Haskell's
 partition would do the job.

 Thanks a lot for your help.

 Greetings,

 Juan Rodriguez

>>>


Re: Help with updateStateByKey

2014-12-18 Thread Tathagata Das
Another point to start playing with updateStateByKey is the example
StatefulNetworkWordCount. See the streaming examples directory in the
Spark repository.

TD



On Thu, Dec 18, 2014 at 6:07 AM, Pierce Lamb
 wrote:
> I am trying to run stateful Spark Streaming computations over (fake)
> apache web server logs read from Kafka. The goal is to "sessionize"
> the web traffic similar to this blog post:
> http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessionization-with-spark-streaming-and-apache-hadoop/
>
> The only difference is that I want to "sessionize" each page the IP
> hits, instead of the entire session. I was able to do this reading
> from a file of fake web traffic using Spark in batch mode, but now I
> want to do it in a streaming context.
>
> Log files are read from Kafka and parsed into K/V pairs of
>
> (String, (String, Long, Long)) or
>
> (IP, (requestPage, time, time))
>
> I then call "groupByKey()" on this K/V pair. In batch mode, this would
> produce a:
>
> (String, CollectionBuffer((String, Long, Long), ...) or
>
> (IP, CollectionBuffer((requestPage, time, time), ...)
>
> In a StreamingContext, it produces a:
>
> (String, ArrayBuffer((String, Long, Long), ...) like so:
>
> (183.196.254.131,ArrayBuffer((/test.php,1418849762000,1418849762000)))
>
> However, as the next microbatch (DStream) arrives, this information is
> discarded. Ultimately what I want is for that ArrayBuffer to fill up
> over time as a given IP continues to interact and to run some
> computations on its data to "sessionize" the page time. I believe the
> operator to make that happen is "updateStateByKey." I'm having some
> trouble with this operator (I'm new to both Spark & Scala); any help
> is appreciated.
>
> Thus far:
>
> val grouped = ipTimeStamp.groupByKey().updateStateByKey(updateGroupByKey)
>
>
> def updateGroupByKey(
>   a: Seq[(String, ArrayBuffer[(String,
> Long, Long)])],
>   b: Option[(String, ArrayBuffer[(String,
> Long, Long)])]
>   ): Option[(String, ArrayBuffer[(String,
> Long, Long)])] = {
>
>   }
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming Python APIs?

2014-12-18 Thread Tathagata Das
A more updated version of the streaming programming guide is here

http://people.apache.org/~tdas/spark-1.2-temp/streaming-programming-guide.html

Please refer to this until we make the official release of Spark 1.2

TD

On Tue, Dec 16, 2014 at 3:50 PM, smallmonkey...@hotmail.com
 wrote:
> Hi zhu:
> maybe there is not the python api for spark-stream
> baishuo
> 
> smallmonkey...@hotmail.com
>
>
> From: Xiaoyong Zhu
> Date: 2014-12-15 10:52
> To: user@spark.apache.org
> Subject: Spark Streaming Python APIs?
>
> Hi spark experts
>
>
>
> Are there any Python APIs for Spark Streaming? I didn’t find the Python APIs
> in Spark Streaming programming guide..
>
> http://spark.apache.org/docs/latest/streaming-programming-guide.html
>
>
>
> Xiaoyong
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Incorrect results when calling collect() ?

2014-12-18 Thread Tristan Blakers
Suspected the same thing, but because the underlying data classes are
deserialised by Avro I think they have to be mutable as you need to provide
the no-args constructor with settable fields.

Nothing is being cached in my code anywhere, and this can be reproduced
using data directly out of the newAPIHadoopRDD() call. Debugs added to the
constructors of the various classes show that the right number are being
constructed, though the watches set on some of the fields aren’t always
triggering, so suspect maybe the serialisation is doing something a bit too
clever?

Tristan

On 18 December 2014 at 21:25, Sean Owen  wrote:
>
> It sounds a lot like your values are mutable classes and you are
> mutating or reusing them somewhere? It might work until you actually
> try to materialize them all and find many point to the same object.
>
> On Thu, Dec 18, 2014 at 10:06 AM, Tristan Blakers 
> wrote:
> > Hi,
> >
> > I’m getting some seemingly invalid results when I collect an RDD. This is
> > happening in both Spark 1.1.0 and 1.2.0, using Java8 on Mac.
> >
> > See the following code snippet:
> >
> > JavaRDD rdd= pairRDD.values();
> > rdd.foreach( e -> System.out.println ( "RDD Foreach: " + e ) );
> > rdd.collect().forEach( e -> System.out.println ( "Collected Foreach: " +
> e )
> > );
> >
> > I would expect the results from the two outputters to be identical, but
> > instead I see:
> >
> > RDD Foreach: Thing1
> > RDD Foreach: Thing2
> > RDD Foreach: Thing3
> > RDD Foreach: Thing4
> > (…snip…)
> > Collected Foreach: Thing1
> > Collected Foreach: Thing1
> > Collected Foreach: Thing1
> > Collected Foreach: Thing2
> >
> > So essentially the valid entries except for one are replaced by an
> > equivalent number of duplicate objects. I’ve tried various map and filter
> > operations, but the results in the RDD always appear correct until I try
> to
> > collect() the results. I’ve also found that calling cache() on the RDD
> > materialises the duplication process such that the RDD Foreach displays
> the
> > duplicates too...
> >
> > Any suggestions for how I can go about debugging this would be massively
> > appreciated.
> >
> > Cheers
> > Tristan
>


Re: Incorrect results when calling collect() ?

2014-12-18 Thread Sean Owen
Being mutable is fine; reusing and mutating the objects is the issue.
And yes the objects you get back from Hadoop are reused by Hadoop
InputFormats. You should just map the objects to a clone before using
them where you need them to exist all independently at once, like
before a collect().

(That said... generally speaking collect() involves copying from
workers to the driver, which necessarily means a copy anyway. I
suspect this isn't working that way for you since you're running it
all locally?)

On Thu, Dec 18, 2014 at 10:42 AM, Tristan Blakers  wrote:
> Suspected the same thing, but because the underlying data classes are
> deserialised by Avro I think they have to be mutable as you need to provide
> the no-args constructor with settable fields.
>
> Nothing is being cached in my code anywhere, and this can be reproduced
> using data directly out of the newAPIHadoopRDD() call. Debugs added to the
> constructors of the various classes show that the right number are being
> constructed, though the watches set on some of the fields aren’t always
> triggering, so suspect maybe the serialisation is doing something a bit too
> clever?
>
> Tristan
>
> On 18 December 2014 at 21:25, Sean Owen  wrote:
>>
>> It sounds a lot like your values are mutable classes and you are
>> mutating or reusing them somewhere? It might work until you actually
>> try to materialize them all and find many point to the same object.
>>
>> On Thu, Dec 18, 2014 at 10:06 AM, Tristan Blakers 
>> wrote:
>> > Hi,
>> >
>> > I’m getting some seemingly invalid results when I collect an RDD. This
>> > is
>> > happening in both Spark 1.1.0 and 1.2.0, using Java8 on Mac.
>> >
>> > See the following code snippet:
>> >
>> > JavaRDD rdd= pairRDD.values();
>> > rdd.foreach( e -> System.out.println ( "RDD Foreach: " + e ) );
>> > rdd.collect().forEach( e -> System.out.println ( "Collected Foreach: " +
>> > e )
>> > );
>> >
>> > I would expect the results from the two outputters to be identical, but
>> > instead I see:
>> >
>> > RDD Foreach: Thing1
>> > RDD Foreach: Thing2
>> > RDD Foreach: Thing3
>> > RDD Foreach: Thing4
>> > (…snip…)
>> > Collected Foreach: Thing1
>> > Collected Foreach: Thing1
>> > Collected Foreach: Thing1
>> > Collected Foreach: Thing2
>> >
>> > So essentially the valid entries except for one are replaced by an
>> > equivalent number of duplicate objects. I’ve tried various map and
>> > filter
>> > operations, but the results in the RDD always appear correct until I try
>> > to
>> > collect() the results. I’ve also found that calling cache() on the RDD
>> > materialises the duplication process such that the RDD Foreach displays
>> > the
>> > duplicates too...
>> >
>> > Any suggestions for how I can go about debugging this would be massively
>> > appreciated.
>> >
>> > Cheers
>> > Tristan

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: weird bytecode incompatability issue between spark-core jar from mvn repo and official spark prebuilt binary

2014-12-18 Thread Sun, Rui
Yes, https://issues.apache.org/jira/browse/SPARK-2075 is what I met. Thanks!

I think we need to address this issue. At least we need to document this issue.

-Original Message-
From: Sean Owen [mailto:so...@cloudera.com] 
Sent: Thursday, December 18, 2014 5:47 PM
To: Shixiong Zhu
Cc: Sun, Rui; shiva...@eecs.berkeley.edu; user@spark.apache.org
Subject: Re: weird bytecode incompatability issue between spark-core jar from 
mvn repo and official spark prebuilt binary

Have a look at https://issues.apache.org/jira/browse/SPARK-2075

It's not quite that the API is different, but indeed building different 
'flavors' of the same version (hadoop1 vs 2) can strangely lead to this 
problem, even though the public API is identical and in theory the API is 
completely separate from the backend bindings.

IIRC the idea is that only submitting via spark-submit is really supported, 
because there you're definitely running exactly what's on your cluster. That 
should always work.

This sort of gotcha turns up in some specific cases but you can always work 
around it by matching your embedded Spark version as well.

On Thu, Dec 18, 2014 at 9:38 AM, Shixiong Zhu  wrote:
> @Rui do you mean the spark-core jar in the maven central repo are 
> incompatible with the same version of the the official pre-built Spark 
> binary? That's really weird. I thought they should have used the same codes.
>
> Best Regards,
>
> Shixiong Zhu
>
> 2014-12-18 17:22 GMT+08:00 Sean Owen :
>>
>> Well, it's always a good idea to used matched binary versions. Here 
>> it is more acutely necessary. You can use a pre built binary -- if 
>> you use it to compile and also run. Why does it not make sense to 
>> publish artifacts?
>>
>> Not sure what you mean about core vs assembly, as the assembly 
>> contains all of the modules. You don't literally need the same jar 
>> file.
>>
>> On Thu, Dec 18, 2014 at 3:20 AM, Sun, Rui  wrote:
>> > Not using spark-submit. The App directly communicates with the 
>> > Spark cluster in standalone mode.
>> >
>> >
>> >
>> > If mark the Spark dependency as 'provided’, then the spark-core 
>> > .jar elsewhere must be pointe to in CLASSPATH. However, the 
>> > pre-built Spark binary only has an assembly jar, not having 
>> > individual module jars. So you don’t have a chance to point to a 
>> > module.jar which is the same binary as that in the pre-built Spark 
>> > binary.
>> >
>> >
>> >
>> > Maybe the Spark distribution should contain not only the assembly 
>> > jar but also individual module jars. Any opinion?
>> >
>> >
>> >
>> > From: Shivaram Venkataraman [mailto:shiva...@eecs.berkeley.edu]
>> > Sent: Thursday, December 18, 2014 2:20 AM
>> > To: Sean Owen
>> > Cc: Sun, Rui; user@spark.apache.org
>> > Subject: Re: weird bytecode incompatability issue between 
>> > spark-core jar from mvn repo and official spark prebuilt binary
>> >
>> >
>> >
>> > Just to clarify, are you running the application using spark-submit 
>> > after packaging with sbt package ? One thing that might help is to 
>> > mark the Spark dependency as 'provided' as then you shouldn't have 
>> > the Spark classes in your jar.
>> >
>> >
>> >
>> > Thanks
>> >
>> > Shivaram
>> >
>> >
>> >
>> > On Wed, Dec 17, 2014 at 4:39 AM, Sean Owen  wrote:
>> >
>> > You should use the same binaries everywhere. The problem here is 
>> > that anonymous functions get compiled to different names when you 
>> > build different (potentially) so you actually have one function 
>> > being called when another function is meant.
>> >
>> >
>> > On Wed, Dec 17, 2014 at 12:07 PM, Sun, Rui  wrote:
>> >> Hi,
>> >>
>> >>
>> >>
>> >> I encountered a weird bytecode incompatability issue between 
>> >> spark-core jar from mvn repo and official spark prebuilt binary.
>> >>
>> >>
>> >>
>> >> Steps to reproduce:
>> >>
>> >> 1. Download the official pre-built Spark binary 1.1.1 at
>> >> http://d3kbcqa49mib13.cloudfront.net/spark-1.1.1-bin-hadoop1.tgz
>> >>
>> >> 2. Launch the Spark cluster in pseudo cluster mode
>> >>
>> >> 3. A small scala APP which calls RDD.saveAsObjectFile()
>> >>
>> >> scalaVersion := "2.10.4"
>> >>
>> >>
>> >>
>> >> libraryDependencies ++= Seq(
>> >>
>> >>   "org.apache.spark" %% "spark-core" % "1.1.1"
>> >>
>> >> )
>> >>
>> >>
>> >>
>> >> val sc = new SparkContext(args(0), "test") //args[0] is the Spark 
>> >> master URI
>> >>
>> >>   val rdd = sc.parallelize(List(1, 2, 3))
>> >>
>> >>   rdd.saveAsObjectFile("/tmp/mysaoftmp")
>> >>
>> >>   sc.stop
>> >>
>> >>
>> >>
>> >> throws an exception as follows:
>> >>
>> >> [error] (run-main-0) org.apache.spark.SparkException: Job aborted 
>> >> due to stage failure: Task 1 in stage 0.0 failed 4 times, most 
>> >> recent failure:
>> >> Lost
>> >> task 1.3 in stage 0.0 (TID 6, ray-desktop.sh.intel.com):
>> >> java.lang.ClassCastException: scala.Tuple2 cannot be cast to 
>> >> scala.collection.Iterator
>> >>
>> >> [error]
>> >> org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596)
>> >>
>> >> [error]

create table in yarn-cluster mode vs yarn-client mode

2014-12-18 Thread Chirag Aggarwal
Hi,

I have a simple app, where I am trying to create a table. I am able to create 
the table on running app in yarn-client mode, but not with yarn-cluster mode.
Is this some known issue? Has this already been fixed?

Please note that I am using spark-1.1 over hadoop-2.4.0

App:
-
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql._
import org.apache.spark.sql.hive.HiveContext

object HiveSpark {
case class Record(key: Int, value: String)
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("HiveSpark")
val sc = new SparkContext(sparkConf)

val hiveContext = new HiveContext(sc)
import hiveContext._

hql("use ttt")
hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")

sc.stop()
}
}

Thanks,
Chirag



RE: weird bytecode incompatability issue between spark-core jar from mvn repo and official spark prebuilt binary

2014-12-18 Thread Sun, Rui
Owen,

Since we have individual module jars published into the central maven repo for 
an official release, then we need to make sure the official Spark assembly jar 
should be assembled exactly from these jars, so there will be no binary 
compatibility issue. We can also publish the official assembly jar to maven for 
convenience. I doubt there is some mistake in the release procedure for an 
official release.

Yes, you are correct : the assembly contains all of the modules:)  But I am not 
sure if the app want to build itself as an assembly including the dependent 
modules, can it do in such case?

-Original Message-
From: Sean Owen [mailto:so...@cloudera.com] 
Sent: Thursday, December 18, 2014 5:23 PM
To: Sun, Rui
Cc: shiva...@eecs.berkeley.edu; user@spark.apache.org
Subject: Re: weird bytecode incompatability issue between spark-core jar from 
mvn repo and official spark prebuilt binary

Well, it's always a good idea to used matched binary versions. Here it is more 
acutely necessary. You can use a pre built binary -- if you use it to compile 
and also run. Why does it not make sense to publish artifacts?

Not sure what you mean about core vs assembly, as the assembly contains all of 
the modules. You don't literally need the same jar file.

On Thu, Dec 18, 2014 at 3:20 AM, Sun, Rui  wrote:
> Not using spark-submit. The App directly communicates with the Spark 
> cluster in standalone mode.
>
>
>
> If mark the Spark dependency as 'provided’, then the spark-core .jar 
> elsewhere must be pointe to in CLASSPATH. However, the pre-built Spark 
> binary only has an assembly jar, not having individual module jars. So 
> you don’t have a chance to point to a module.jar which is the same 
> binary as that in the pre-built Spark binary.
>
>
>
> Maybe the Spark distribution should contain not only the assembly jar 
> but also individual module jars. Any opinion?
>
>
>
> From: Shivaram Venkataraman [mailto:shiva...@eecs.berkeley.edu]
> Sent: Thursday, December 18, 2014 2:20 AM
> To: Sean Owen
> Cc: Sun, Rui; user@spark.apache.org
> Subject: Re: weird bytecode incompatability issue between spark-core 
> jar from mvn repo and official spark prebuilt binary
>
>
>
> Just to clarify, are you running the application using spark-submit 
> after packaging with sbt package ? One thing that might help is to 
> mark the Spark dependency as 'provided' as then you shouldn't have the 
> Spark classes in your jar.
>
>
>
> Thanks
>
> Shivaram
>
>
>
> On Wed, Dec 17, 2014 at 4:39 AM, Sean Owen  wrote:
>
> You should use the same binaries everywhere. The problem here is that 
> anonymous functions get compiled to different names when you build 
> different (potentially) so you actually have one function being called 
> when another function is meant.
>
>
> On Wed, Dec 17, 2014 at 12:07 PM, Sun, Rui  wrote:
>> Hi,
>>
>>
>>
>> I encountered a weird bytecode incompatability issue between 
>> spark-core jar from mvn repo and official spark prebuilt binary.
>>
>>
>>
>> Steps to reproduce:
>>
>> 1. Download the official pre-built Spark binary 1.1.1 at
>> http://d3kbcqa49mib13.cloudfront.net/spark-1.1.1-bin-hadoop1.tgz
>>
>> 2. Launch the Spark cluster in pseudo cluster mode
>>
>> 3. A small scala APP which calls RDD.saveAsObjectFile()
>>
>> scalaVersion := "2.10.4"
>>
>>
>>
>> libraryDependencies ++= Seq(
>>
>>   "org.apache.spark" %% "spark-core" % "1.1.1"
>>
>> )
>>
>>
>>
>> val sc = new SparkContext(args(0), "test") //args[0] is the Spark 
>> master URI
>>
>>   val rdd = sc.parallelize(List(1, 2, 3))
>>
>>   rdd.saveAsObjectFile("/tmp/mysaoftmp")
>>
>>   sc.stop
>>
>>
>>
>> throws an exception as follows:
>>
>> [error] (run-main-0) org.apache.spark.SparkException: Job aborted due 
>> to stage failure: Task 1 in stage 0.0 failed 4 times, most recent failure:
>> Lost
>> task 1.3 in stage 0.0 (TID 6, ray-desktop.sh.intel.com):
>> java.lang.ClassCastException: scala.Tuple2 cannot be cast to 
>> scala.collection.Iterator
>>
>> [error] org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596)
>>
>> [error] org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596)
>>
>> [error]
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:
>> 35)
>>
>> [error]
>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>>
>> [error] org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>>
>> [error] org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>>
>> [error]
>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>>
>> [error] org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>>
>> [error]
>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>>
>> [error] org.apache.spark.scheduler.Task.run(Task.scala:54)
>>
>> [error]
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
>>
>> [error]
>>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.
>> java:1146)
>>
>> [error]
>>
>> ja

Re: java.io.NotSerializableException: org.apache.avro.mapred.AvroKey using spark with avro

2014-12-18 Thread Anish Haldiya
Hi, I had the same problem.

One option (starting with Spark 1.2, which is currently in preview) is to
use the Avro library for Spark SQL.

Other is using Kryo Serialization.
by default spark uses Java Serialization, you can specify kryo
serialization while creating spark context.

val conf = new SparkConf().set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
val sc = new SparkContext(conf)

This worked for me.

Regards,
Anish


Re: java.io.NotSerializableException: org.apache.avro.mapred.AvroKey using spark with avro

2014-12-18 Thread anish
Hi, I had the same problem.

One option (starting with Spark 1.2, which is currently in preview) is to
use the Avro library for Spark SQL.

Other is using Kryo Serialization.
by default spark uses Java Serialization, you can specify kryo serialization
while creating spark context.

val conf = new SparkConf().set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
val sc = new SparkContext(conf)

This worked for me.

Regards,
Anish



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/java-io-NotSerializableException-org-apache-avro-mapred-AvroKey-using-spark-with-avro-tp15165p20761.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Incorrect results when calling collect() ?

2014-12-18 Thread Tristan Blakers
Recording the outcome here for the record. Based on Sean’s advice I’ve
confirmed that making defensive copies of records that will be collected
avoids this problem - it does seem like Avro is being a bit too aggressive
when deciding it’s safe to reuse an object for a new record.

On 18 December 2014 at 21:50, Sean Owen  wrote:
>
> Being mutable is fine; reusing and mutating the objects is the issue.
> And yes the objects you get back from Hadoop are reused by Hadoop
> InputFormats. You should just map the objects to a clone before using
> them where you need them to exist all independently at once, like
> before a collect().
>
> (That said... generally speaking collect() involves copying from
> workers to the driver, which necessarily means a copy anyway. I
> suspect this isn't working that way for you since you're running it
> all locally?)
>
> On Thu, Dec 18, 2014 at 10:42 AM, Tristan Blakers 
> wrote:
> > Suspected the same thing, but because the underlying data classes are
> > deserialised by Avro I think they have to be mutable as you need to
> provide
> > the no-args constructor with settable fields.
> >
> > Nothing is being cached in my code anywhere, and this can be reproduced
> > using data directly out of the newAPIHadoopRDD() call. Debugs added to
> the
> > constructors of the various classes show that the right number are being
> > constructed, though the watches set on some of the fields aren’t always
> > triggering, so suspect maybe the serialisation is doing something a bit
> too
> > clever?
> >
> > Tristan
> >
> > On 18 December 2014 at 21:25, Sean Owen  wrote:
> >>
> >> It sounds a lot like your values are mutable classes and you are
> >> mutating or reusing them somewhere? It might work until you actually
> >> try to materialize them all and find many point to the same object.
> >>
> >> On Thu, Dec 18, 2014 at 10:06 AM, Tristan Blakers <
> tris...@blackfrog.org>
> >> wrote:
> >> > Hi,
> >> >
> >> > I’m getting some seemingly invalid results when I collect an RDD. This
> >> > is
> >> > happening in both Spark 1.1.0 and 1.2.0, using Java8 on Mac.
> >> >
> >> > See the following code snippet:
> >> >
> >> > JavaRDD rdd= pairRDD.values();
> >> > rdd.foreach( e -> System.out.println ( "RDD Foreach: " + e ) );
> >> > rdd.collect().forEach( e -> System.out.println ( "Collected Foreach:
> " +
> >> > e )
> >> > );
> >> >
> >> > I would expect the results from the two outputters to be identical,
> but
> >> > instead I see:
> >> >
> >> > RDD Foreach: Thing1
> >> > RDD Foreach: Thing2
> >> > RDD Foreach: Thing3
> >> > RDD Foreach: Thing4
> >> > (…snip…)
> >> > Collected Foreach: Thing1
> >> > Collected Foreach: Thing1
> >> > Collected Foreach: Thing1
> >> > Collected Foreach: Thing2
> >> >
> >> > So essentially the valid entries except for one are replaced by an
> >> > equivalent number of duplicate objects. I’ve tried various map and
> >> > filter
> >> > operations, but the results in the RDD always appear correct until I
> try
> >> > to
> >> > collect() the results. I’ve also found that calling cache() on the RDD
> >> > materialises the duplication process such that the RDD Foreach
> displays
> >> > the
> >> > duplicates too...
> >> >
> >> > Any suggestions for how I can go about debugging this would be
> massively
> >> > appreciated.
> >> >
> >> > Cheers
> >> > Tristan
>


Re: Spark SQL DSL for joins?

2014-12-18 Thread Jerry Raj
Thanks, that helped. And I needed SchemaRDD.as() to provide an alias for 
the RDD.


-Jerry

On 17/12/14 12:12 pm, Tobias Pfeiffer wrote:

Jerry,

On Wed, Dec 17, 2014 at 3:35 PM, Jerry Raj mailto:jerry@gmail.com>> wrote:

Another problem with the DSL:

t1.where('term == "dmin").count() returns zero.


Looks like you need ===:
https://spark.apache.org/docs/1.1.0/api/scala/index.html#org.apache.spark.sql.SchemaRDD

Tobias



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: No disk single pass RDD aggregation

2014-12-18 Thread Jim Carroll
Hi,

This was all my fault. It turned out I had a line of code buried in a
library that did a "repartition." I used this library to wrap an RDD to
present it to legacy code as a different interface. That's what was causing
the data to spill to disk.

The really stupid thing is it took me the better part of a day to find and
several misguided emails to this list (including the one that started this
thread).

Sorry about that.

Jim




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/No-disk-single-pass-RDD-aggregation-tp20723p20763.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



pyspark 1.1.1 on windows saveAsTextFile - NullPointerException

2014-12-18 Thread mj
Hi,

I'm trying to use pyspark to save a simple rdd to a text file (code below),
but it keeps throwing an error.

- Python Code -
items=["Hello", "world"]
items2 = sc.parallelize(items)
items2.coalesce(1).saveAsTextFile('c:/tmp/python_out.csv')


- Error --C:\Python27\python.exe "C:/Users/Mark
Jones/PycharmProjects/spark_test/spark_error_sample.py"
Using Spark's default log4j profile:
org/apache/spark/log4j-defaults.properties
14/12/18 13:00:53 INFO SecurityManager: Changing view acls to: Mark Jones,
14/12/18 13:00:53 INFO SecurityManager: Changing modify acls to: Mark Jones,
14/12/18 13:00:53 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users with view permissions: Set(Mark Jones, );
users with modify permissions: Set(Mark Jones, )
14/12/18 13:00:53 INFO Slf4jLogger: Slf4jLogger started
14/12/18 13:00:53 INFO Remoting: Starting remoting
14/12/18 13:00:53 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://sparkDriver@192.168.19.83:54548]
14/12/18 13:00:53 INFO Remoting: Remoting now listens on addresses:
[akka.tcp://sparkDriver@192.168.19.83:54548]
14/12/18 13:00:53 INFO Utils: Successfully started service 'sparkDriver' on
port 54548.
14/12/18 13:00:53 INFO SparkEnv: Registering MapOutputTracker
14/12/18 13:00:53 INFO SparkEnv: Registering BlockManagerMaster
14/12/18 13:00:53 INFO DiskBlockManager: Created local directory at
C:\Users\MARKJO~1\AppData\Local\Temp\spark-local-20141218130053-1ab9
14/12/18 13:00:53 INFO Utils: Successfully started service 'Connection
manager for block manager' on port 54551.
14/12/18 13:00:53 INFO ConnectionManager: Bound socket to port 54551 with id
= ConnectionManagerId(192.168.19.83,54551)
14/12/18 13:00:53 INFO MemoryStore: MemoryStore started with capacity 265.1
MB
14/12/18 13:00:53 INFO BlockManagerMaster: Trying to register BlockManager
14/12/18 13:00:53 INFO BlockManagerMasterActor: Registering block manager
192.168.19.83:54551 with 265.1 MB RAM
14/12/18 13:00:53 INFO BlockManagerMaster: Registered BlockManager
14/12/18 13:00:53 INFO HttpFileServer: HTTP File server directory is
C:\Users\MARKJO~1\AppData\Local\Temp\spark-a43340e8-2621-46b8-a44e-8874dd178393
14/12/18 13:00:53 INFO HttpServer: Starting HTTP Server
14/12/18 13:00:54 INFO Utils: Successfully started service 'HTTP file
server' on port 54552.
14/12/18 13:00:54 INFO Utils: Successfully started service 'SparkUI' on port
4040.
14/12/18 13:00:54 INFO SparkUI: Started SparkUI at http://192.168.19.83:4040
14/12/18 13:00:54 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
14/12/18 13:00:54 ERROR Shell: Failed to locate the winutils binary in the
hadoop binary path
java.io.IOException: Could not locate executable null\bin\winutils.exe in
the Hadoop binaries.
at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:318)
at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:333)
at org.apache.hadoop.util.Shell.(Shell.java:326)
at org.apache.hadoop.util.StringUtils.(StringUtils.java:76)
at org.apache.hadoop.security.Groups.parseStaticMapping(Groups.java:93)
at org.apache.hadoop.security.Groups.(Groups.java:77)
at
org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:240)
at
org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:255)
at
org.apache.hadoop.security.UserGroupInformation.setConfiguration(UserGroupInformation.java:283)
at 
org.apache.spark.deploy.SparkHadoopUtil.(SparkHadoopUtil.scala:36)
at
org.apache.spark.deploy.SparkHadoopUtil$.(SparkHadoopUtil.scala:109)
at 
org.apache.spark.deploy.SparkHadoopUtil$.(SparkHadoopUtil.scala)
at org.apache.spark.SparkContext.(SparkContext.scala:228)
at
org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:53)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:408)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:214)
at
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:79)
at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:68)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)
14/12/18 13:00:54 INFO AkkaUtils: Connecting to HeartbeatReceiver:
akka.tcp://sparkDriver@192.168.19.83:54548/user/HeartbeatReceiver
14/12/18 13:00:55 INFO deprecation: mapred.tip.id is dep

Re: pyspark 1.1.1 on windows saveAsTextFile - NullPointerException

2014-12-18 Thread Akhil Das
It seems You are missing HADOOP_HOME in the environment. As it says:

java.io.IOException: Could not locate executable *null*\bin\winutils.exe in
the Hadoop binaries.

That null is supposed to be your HADOOP_HOME.

Thanks
Best Regards

On Thu, Dec 18, 2014 at 7:10 PM, mj  wrote:
>
> Hi,
>
> I'm trying to use pyspark to save a simple rdd to a text file (code below),
> but it keeps throwing an error.
>
> - Python Code -
> items=["Hello", "world"]
> items2 = sc.parallelize(items)
> items2.coalesce(1).saveAsTextFile('c:/tmp/python_out.csv')
>
>
> - Error --C:\Python27\python.exe "C:/Users/Mark
> Jones/PycharmProjects/spark_test/spark_error_sample.py"
> Using Spark's default log4j profile:
> org/apache/spark/log4j-defaults.properties
> 14/12/18 13:00:53 INFO SecurityManager: Changing view acls to: Mark Jones,
> 14/12/18 13:00:53 INFO SecurityManager: Changing modify acls to: Mark
> Jones,
> 14/12/18 13:00:53 INFO SecurityManager: SecurityManager: authentication
> disabled; ui acls disabled; users with view permissions: Set(Mark Jones, );
> users with modify permissions: Set(Mark Jones, )
> 14/12/18 13:00:53 INFO Slf4jLogger: Slf4jLogger started
> 14/12/18 13:00:53 INFO Remoting: Starting remoting
> 14/12/18 13:00:53 INFO Remoting: Remoting started; listening on addresses
> :[akka.tcp://sparkDriver@192.168.19.83:54548]
> 14/12/18 13:00:53 INFO Remoting: Remoting now listens on addresses:
> [akka.tcp://sparkDriver@192.168.19.83:54548]
> 14/12/18 13:00:53 INFO Utils: Successfully started service 'sparkDriver' on
> port 54548.
> 14/12/18 13:00:53 INFO SparkEnv: Registering MapOutputTracker
> 14/12/18 13:00:53 INFO SparkEnv: Registering BlockManagerMaster
> 14/12/18 13:00:53 INFO DiskBlockManager: Created local directory at
> C:\Users\MARKJO~1\AppData\Local\Temp\spark-local-20141218130053-1ab9
> 14/12/18 13:00:53 INFO Utils: Successfully started service 'Connection
> manager for block manager' on port 54551.
> 14/12/18 13:00:53 INFO ConnectionManager: Bound socket to port 54551 with
> id
> = ConnectionManagerId(192.168.19.83,54551)
> 14/12/18 13:00:53 INFO MemoryStore: MemoryStore started with capacity 265.1
> MB
> 14/12/18 13:00:53 INFO BlockManagerMaster: Trying to register BlockManager
> 14/12/18 13:00:53 INFO BlockManagerMasterActor: Registering block manager
> 192.168.19.83:54551 with 265.1 MB RAM
> 14/12/18 13:00:53 INFO BlockManagerMaster: Registered BlockManager
> 14/12/18 13:00:53 INFO HttpFileServer: HTTP File server directory is
>
> C:\Users\MARKJO~1\AppData\Local\Temp\spark-a43340e8-2621-46b8-a44e-8874dd178393
> 14/12/18 13:00:53 INFO HttpServer: Starting HTTP Server
> 14/12/18 13:00:54 INFO Utils: Successfully started service 'HTTP file
> server' on port 54552.
> 14/12/18 13:00:54 INFO Utils: Successfully started service 'SparkUI' on
> port
> 4040.
> 14/12/18 13:00:54 INFO SparkUI: Started SparkUI at
> http://192.168.19.83:4040
> 14/12/18 13:00:54 WARN NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
> 14/12/18 13:00:54 ERROR Shell: Failed to locate the winutils binary in the
> hadoop binary path
> java.io.IOException: Could not locate executable null\bin\winutils.exe in
> the Hadoop binaries.
> at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:318)
> at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:333)
> at org.apache.hadoop.util.Shell.(Shell.java:326)
> at org.apache.hadoop.util.StringUtils.(StringUtils.java:76)
> at
> org.apache.hadoop.security.Groups.parseStaticMapping(Groups.java:93)
> at org.apache.hadoop.security.Groups.(Groups.java:77)
> at
>
> org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:240)
> at
>
> org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:255)
> at
>
> org.apache.hadoop.security.UserGroupInformation.setConfiguration(UserGroupInformation.java:283)
> at
> org.apache.spark.deploy.SparkHadoopUtil.(SparkHadoopUtil.scala:36)
> at
> org.apache.spark.deploy.SparkHadoopUtil$.(SparkHadoopUtil.scala:109)
> at
> org.apache.spark.deploy.SparkHadoopUtil$.(SparkHadoopUtil.scala)
> at org.apache.spark.SparkContext.(SparkContext.scala:228)
> at
>
> org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:53)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
> at
>
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
>
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:408)
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234)
> at
> py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
> at py4j.Gateway.invoke(Gateway.java:214

Downloads from S3 exceedingly slow when running on spark-ec2

2014-12-18 Thread Jon Chase
I'm running a very simple Spark application that downloads files from S3,
does a bit of mapping, then uploads new files.  Each file is roughly 2MB
and is gzip'd.  I was running the same code on Amazon's EMR w/Spark and not
having any download speed issues (Amazon's EMR provides a custom
implementation of the s3n:// file system, FWIW).

When I say exceedingly slow, I mean that it takes about 2 minutes to
download and process a 2MB file (this was taking ~2 seconds on the same
instance types in Amazon's EMR).  When I download the same file from the
EC2 machine with wget or curl, it downloads in ~ 1 second.  I've also done
other bandwidth checks for downloads from other external hosts - no speed
problems there.

Tried this w/Spark 1.1.0 and 1.1.1.

When I do a thread dump on a worker, I typically see this a lot:



"Executor task launch worker-7" daemon prio=10 tid=0x7fd174039000
nid=0x59e9 runnable [0x7fd1f7dfb000]
   java.lang.Thread.State: RUNNABLE
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.read(SocketInputStream.java:152)
at java.net.SocketInputStream.read(SocketInputStream.java:122)
at sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
at sun.security.ssl.InputRecord.read(InputRecord.java:480)
at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927)
- locked <0x0007e44dd140> (a java.lang.Object)
at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:884)
at sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
- locked <0x0007e44e1350> (a sun.security.ssl.AppInputStream)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:235)
at java.io.BufferedInputStream.read(BufferedInputStream.java:254)
- locked <0x0007e44ea800> (a java.io.BufferedInputStream)
at org.apache.commons.httpclient.HttpParser.readRawLine(HttpParser.java:78)
at org.apache.commons.httpclient.HttpParser.readLine(HttpParser.java:106)
at
org.apache.commons.httpclient.HttpConnection.readLine(HttpConnection.java:1116)
at
org.apache.commons.httpclient.MultiThreadedHttpConnectionManager$HttpConnectionAdapter.readLine(MultiThreadedHttpConnectionManager.java:1413)
at
org.apache.commons.httpclient.HttpMethodBase.readStatusLine(HttpMethodBase.java:1973)
at
org.apache.commons.httpclient.HttpMethodBase.readResponse(HttpMethodBase.java:1735)
at
org.apache.commons.httpclient.HttpMethodBase.execute(HttpMethodBase.java:1098)
at
org.apache.commons.httpclient.HttpMethodDirector.executeWithRetry(HttpMethodDirector.java:398)
at
org.apache.commons.httpclient.HttpMethodDirector.executeMethod(HttpMethodDirector.java:171)
at
org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:397)
at
org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:323)
at
org.jets3t.service.impl.rest.httpclient.RestS3Service.performRequest(RestS3Service.java:342)
at
org.jets3t.service.impl.rest.httpclient.RestS3Service.performRestHead(RestS3Service.java:718)
at
org.jets3t.service.impl.rest.httpclient.RestS3Service.getObjectImpl(RestS3Service.java:1599)
at
org.jets3t.service.impl.rest.httpclient.RestS3Service.getObjectDetailsImpl(RestS3Service.java:1535)
at org.jets3t.service.S3Service.getObjectDetails(S3Service.java:1987)
at org.jets3t.service.S3Service.getObjectDetails(S3Service.java:1332)
at
org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:111)
at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59)
at org.apache.hadoop.fs.s3native.$Proxy6.retrieveMetadata(Unknown Source)
at
org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus(NativeS3FileSystem.java:330)
at
org.apache.hadoop.fs.s3native.NativeS3FileSystem.mkdir(NativeS3FileSystem.java:432)
at
org.apache.hadoop.fs.s3native.NativeS3FileSystem.mkdirs(NativeS3FileSystem.java:425)
at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1126)
at
org.apache.hadoop.mapred.FileOutputCommitter.getWorkPath(FileOutputCommitter.java:256)
at
org.apache.hadoop.mapred.FileOutputFormat.getTaskOutputPath(FileOutputFormat.java:244)
at
org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:126)
at
org.apache.hadoop.mapred.lib.MultipleTextOutputFormat.getBaseRecordWriter(MultipleTextOutputFormat.java:44)
at
org.apache.hadoop.mapred.lib.MultipleOutputFormat$1.write(MultipleOutputFormat.java:99)
at org.apache.spark.SparkHadoopWriter.write(SparkHadoopWriter.scala:94)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:986)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974)
at org.apache.spark.scheduler.ResultTask.runTask(Re

Re: Help with updateStateByKey

2014-12-18 Thread Silvio Fiorito
Hi Pierce,

You shouldn’t have to use groupByKey because updateStateByKey will get a 
Seq of all the values for that key already.

I used that for realtime sessionization as well. What I did was key my 
incoming events, then send them to udpateStateByKey. The updateStateByKey 
function then received a Seq of the events and the Option of the previous 
state for that key. The sessionization code then did its thing to check if 
the incoming events were part of the same session, based on a configured 
timeout. If a session already was active (from the previous state) and it 
hadn’t exceeded the timeout, it used that value. Otherwise it generated a 
new session id. Then the return value for the updateStateByKey function 
was a Tuple of session id and last timestamp.

Then I joined the DStream with the session ids, which were both keyed off 
the same id and continued my processing. Your requirements may be 
different, but that’s what worked well for me.

Another thing to consider is cleaning up old sessions by returning None in 
the updateStateByKey function. This will help with long running apps and 
minimize memory usage (and checkpoint size).

I was using something similar to the method above on a live production 
stream with very little CPU and memory footprint, running for weeks at a 
time, processing up to 15M events per day with fluctuating traffic.

Thanks,
Silvio



On 12/17/14, 10:07 PM, "Pierce Lamb"  wrote:

>I am trying to run stateful Spark Streaming computations over (fake)
>apache web server logs read from Kafka. The goal is to "sessionize"
>the web traffic similar to this blog post:
>http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessionizat
>ion-with-spark-streaming-and-apache-hadoop/
>
>The only difference is that I want to "sessionize" each page the IP
>hits, instead of the entire session. I was able to do this reading
>from a file of fake web traffic using Spark in batch mode, but now I
>want to do it in a streaming context.
>
>Log files are read from Kafka and parsed into K/V pairs of
>
>(String, (String, Long, Long)) or
>
>(IP, (requestPage, time, time))
>
>I then call "groupByKey()" on this K/V pair. In batch mode, this would
>produce a:
>
>(String, CollectionBuffer((String, Long, Long), ...) or
>
>(IP, CollectionBuffer((requestPage, time, time), ...)
>
>In a StreamingContext, it produces a:
>
>(String, ArrayBuffer((String, Long, Long), ...) like so:
>
>(183.196.254.131,ArrayBuffer((/test.php,1418849762000,1418849762000)))
>
>However, as the next microbatch (DStream) arrives, this information is
>discarded. Ultimately what I want is for that ArrayBuffer to fill up
>over time as a given IP continues to interact and to run some
>computations on its data to "sessionize" the page time. I believe the
>operator to make that happen is "updateStateByKey." I'm having some
>trouble with this operator (I'm new to both Spark & Scala); any help
>is appreciated.
>
>Thus far:
>
>val grouped = 
>ipTimeStamp.groupByKey().updateStateByKey(updateGroupByKey)
>
>
>def updateGroupByKey(
>  a: Seq[(String, ArrayBuffer[(String,
>Long, Long)])],
>  b: Option[(String, ArrayBuffer[(String,
>Long, Long)])]
>  ): Option[(String, ArrayBuffer[(String,
>Long, Long)])] = {
>
>  }
>
>-
>To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>For additional commands, e-mail: user-h...@spark.apache.org
>


Spark 1.2 Release Date

2014-12-18 Thread Al M
Is there a planned release date for Spark 1.2?  I saw on the  Spark Wiki
   that we
are already in the latter part of the release window.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-2-Release-Date-tp20765.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 1.2 Release Date

2014-12-18 Thread Silvio Fiorito
It’s on Maven Central already http://search.maven.org/#browse%7C717101892






On 12/18/14, 2:09 PM, "Al M"  wrote:

>Is there a planned release date for Spark 1.2?  I saw on the  Spark Wiki
>   that 
>we
>are already in the latter part of the release window.
>
>
>
>--
>View this message in context: 
>http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-2-Release-Date
>-tp20765.html
>Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
>-
>To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>For additional commands, e-mail: user-h...@spark.apache.org
>


EC2 VPC script

2014-12-18 Thread Eduardo Cusa
Hi guys.

I run the folling command to lauch a new cluster :

./spark-ec2 -k test -i test.pem -s 1  --vpc-id vpc-X --subnet-id
subnet-X launch  vpc_spark

The instances started ok but the command never end. With the following
output:


Setting up security groups...
Searching for existing cluster vpc_spark...
Spark AMI: ami-5bb18832
Launching instances...
Launched 1 slaves in us-east-1a, regid = r-e9d603c4
Launched master in us-east-1a, regid = r-89d104a4
Waiting for cluster to enter 'ssh-ready' state...


any ideas what happend?


regards
Eduardo


undefined

2014-12-18 Thread Eduardo Cusa
Hi guys.

I run the folling command to lauch a new cluster :

./spark-ec2 -k test -i test.pem -s 1  --vpc-id vpc-X --subnet-id
subnet-X launch  vpc_spark

The instances started ok but the command never end. With the following
output:


Setting up security groups...
Searching for existing cluster vpc_spark...
Spark AMI: ami-5bb18832
Launching instances...
Launched 1 slaves in us-east-1a, regid = r-e9d603c4
Launched master in us-east-1a, regid = r-89d104a4
Waiting for cluster to enter 'ssh-ready' state...


any ideas what happend?


Re: Spark 1.2 Release Date

2014-12-18 Thread nitin
Soon enough :)

http://apache-spark-developers-list.1001551.n3.nabble.com/RESULT-VOTE-Release-Apache-Spark-1-2-0-RC2-td9815.html



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-2-Release-Date-tp20765p20766.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Effects problems in logistic regression

2014-12-18 Thread Franco Barrientos
Hi all!,

 

I have a problem with LogisticRegressionWithSGD, when I train a data set
with one variable (wich is a amount of an item) and intercept, I get weights
of

(-0.4021,-207.1749) for both features, respectively. This don´t make sense
to me because I run a logistic regression for the same data in SAS and I get
these weights (-2.6604,0.000245).

 

The rank of this variable is from 0 to 59102 with a mean of 1158.

 

The problem is when I want to calculate the probabilities for each user from
data set, this probability is near to zero or zero in much cases, because
when spark calculates exp(-1*(-0.4021+(-207.1749)*amount)) this is a big
number, in fact infinity for spark.

 

How can I treat this variable? or why this happened? 

 

Thanks ,

 

Franco Barrientos
Data Scientist

Málaga #115, Of. 1003, Las Condes.
Santiago, Chile.
(+562)-29699649
(+569)-76347893

  franco.barrien...@exalitica.com 

  www.exalitica.com


   

 



Standalone Spark program

2014-12-18 Thread Akshat Aranya
Hi,

I am building a Spark-based service which requires initialization of a
SparkContext in a main():

def main(args: Array[String]) {
val conf = new SparkConf(false)
  .setMaster("spark://foo.example.com:7077")
  .setAppName("foobar")

val sc = new SparkContext(conf)
val rdd = sc.parallelize(0 until 255)
val res =  rdd.mapPartitions(it => it).take(1)
println(s"res=$res")
sc.stop()
}

This code works fine via REPL, but not as a standalone program; it causes a
ClassNotFoundException.  This has me confused about how code is shipped out
to executors.  When using via REPL, does the mapPartitions closure, it=>it,
get sent out when the REPL statement is executed?  When this code is run as
a standalone program (not via spark-submit), is the compiled code expected
to be present at the the executor?

Thanks,
Akshat


Re: Spark 1.2 Release Date

2014-12-18 Thread Al M
Awesome.  Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-2-Release-Date-tp20765p20767.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Effects problems in logistic regression

2014-12-18 Thread Sean Owen
Are you sure this is an apples-to-apples comparison? for example does your
SAS process normalize or otherwise transform the data first?

Is the optimization configured similarly in both cases -- same
regularization, etc.?

Are you sure you are pulling out the intercept correctly? It is a separate
value from the logistic regression model in Spark.

On Thu, Dec 18, 2014 at 4:34 PM, Franco Barrientos <
franco.barrien...@exalitica.com> wrote:
>
> Hi all!,
>
>
>
> I have a problem with LogisticRegressionWithSGD, when I train a data set
> with one variable (wich is a amount of an item) and intercept, I get
> weights of
>
> (-0.4021,-207.1749) for both features, respectively. This don´t make sense
> to me because I run a logistic regression for the same data in SAS and I
> get these weights (-2.6604,0.000245).
>
>
>
> The rank of this variable is from 0 to 59102 with a mean of 1158.
>
>
>
> The problem is when I want to calculate the probabilities for each user
> from data set, this probability is near to zero or zero in much cases,
> because when spark calculates exp(-1*(-0.4021+(-207.1749)*amount)) this is
> a big number, in fact infinity for spark.
>
>
>
> How can I treat this variable? or why this happened?
>
>
>
> Thanks ,
>
>
>
> *Franco Barrientos*
> Data Scientist
>
> Málaga #115, Of. 1003, Las Condes.
> Santiago, Chile.
> (+562)-29699649
> (+569)-76347893
>
> franco.barrien...@exalitica.com
>
> www.exalitica.com
>
> [image: http://exalitica.com/web/img/frim.png]
>
>
>


Re: Standalone Spark program

2014-12-18 Thread Akhil Das
You can build a jar of your project and add it to the sparkContext
(sc.addJar("/path/to/your/project.jar")) then it will get shipped to the
worker and hence no classNotfoundException!

Thanks
Best Regards

On Thu, Dec 18, 2014 at 10:06 PM, Akshat Aranya  wrote:
>
> Hi,
>
> I am building a Spark-based service which requires initialization of a
> SparkContext in a main():
>
> def main(args: Array[String]) {
> val conf = new SparkConf(false)
>   .setMaster("spark://foo.example.com:7077")
>   .setAppName("foobar")
>
> val sc = new SparkContext(conf)
> val rdd = sc.parallelize(0 until 255)
> val res =  rdd.mapPartitions(it => it).take(1)
> println(s"res=$res")
> sc.stop()
> }
>
> This code works fine via REPL, but not as a standalone program; it causes
> a ClassNotFoundException.  This has me confused about how code is shipped
> out to executors.  When using via REPL, does the mapPartitions closure,
> it=>it, get sent out when the REPL statement is executed?  When this code
> is run as a standalone program (not via spark-submit), is the compiled code
> expected to be present at the the executor?
>
> Thanks,
> Akshat
>
>


RE: Effects problems in logistic regression

2014-12-18 Thread Franco Barrientos
Yes, without the “amounts” variables the results are similiar. When I put other 
variables its fine.

 

De: Sean Owen [mailto:so...@cloudera.com] 
Enviado el: jueves, 18 de diciembre de 2014 14:22
Para: Franco Barrientos
CC: user@spark.apache.org
Asunto: Re: Effects problems in logistic regression

 

Are you sure this is an apples-to-apples comparison? for example does your SAS 
process normalize or otherwise transform the data first? 

 

Is the optimization configured similarly in both cases -- same regularization, 
etc.?

 

Are you sure you are pulling out the intercept correctly? It is a separate 
value from the logistic regression model in Spark.

 

On Thu, Dec 18, 2014 at 4:34 PM, Franco Barrientos 
mailto:franco.barrien...@exalitica.com> > 
wrote:

Hi all!,

 

I have a problem with LogisticRegressionWithSGD, when I train a data set with 
one variable (wich is a amount of an item) and intercept, I get weights of

(-0.4021,-207.1749) for both features, respectively. This don´t make sense to 
me because I run a logistic regression for the same data in SAS and I get these 
weights (-2.6604,0.000245).

 

The rank of this variable is from 0 to 59102 with a mean of 1158.

 

The problem is when I want to calculate the probabilities for each user from 
data set, this probability is near to zero or zero in much cases, because when 
spark calculates exp(-1*(-0.4021+(-207.1749)*amount)) this is a big number, in 
fact infinity for spark.

 

How can I treat this variable? or why this happened? 

 

Thanks ,

 

Franco Barrientos
Data Scientist

Málaga #115, Of. 1003, Las Condes.
Santiago, Chile.
(+562)-29699649  
(+569)-76347893  

franco.barrien...@exalitica.com   

www.exalitica.com  


   

 



Re: Effects problems in logistic regression

2014-12-18 Thread DB Tsai
Can you try LogisticRegressionWithLBFGS? I verified that this will be
converged to the same result trained by R's glmnet package without
regularization. The problem of LogisticRegressionWithSGD is it's very slow
in term of converging, and lots of time, it's very sensitive to stepsize
which can lead to wrong answer.

The regularization logic in MLLib is not entirely correct, and it will
penalize the intercept. In general, with really high regularization, all
the coefficients will be zeros except the intercept. In logistic
regression, the non-zero intercept can be understood as the
prior-probability of each class, and in linear regression, this will be the
mean of response. I'll have a PR to fix this issue.


Sincerely,

DB Tsai
---
My Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai

On Thu, Dec 18, 2014 at 10:50 AM, Franco Barrientos <
franco.barrien...@exalitica.com> wrote:
>
> Yes, without the “amounts” variables the results are similiar. When I put
> other variables its fine.
>
>
>
> *De:* Sean Owen [mailto:so...@cloudera.com]
> *Enviado el:* jueves, 18 de diciembre de 2014 14:22
> *Para:* Franco Barrientos
> *CC:* user@spark.apache.org
> *Asunto:* Re: Effects problems in logistic regression
>
>
>
> Are you sure this is an apples-to-apples comparison? for example does your
> SAS process normalize or otherwise transform the data first?
>
>
>
> Is the optimization configured similarly in both cases -- same
> regularization, etc.?
>
>
>
> Are you sure you are pulling out the intercept correctly? It is a separate
> value from the logistic regression model in Spark.
>
>
>
> On Thu, Dec 18, 2014 at 4:34 PM, Franco Barrientos <
> franco.barrien...@exalitica.com> wrote:
>
> Hi all!,
>
>
>
> I have a problem with LogisticRegressionWithSGD, when I train a data set
> with one variable (wich is a amount of an item) and intercept, I get
> weights of
>
> (-0.4021,-207.1749) for both features, respectively. This don´t make sense
> to me because I run a logistic regression for the same data in SAS and I
> get these weights (-2.6604,0.000245).
>
>
>
> The rank of this variable is from 0 to 59102 with a mean of 1158.
>
>
>
> The problem is when I want to calculate the probabilities for each user
> from data set, this probability is near to zero or zero in much cases,
> because when spark calculates exp(-1*(-0.4021+(-207.1749)*amount)) this is
> a big number, in fact infinity for spark.
>
>
>
> How can I treat this variable? or why this happened?
>
>
>
> Thanks ,
>
>
>
> *Franco Barrientos*
> Data Scientist
>
> Málaga #115, Of. 1003, Las Condes.
> Santiago, Chile.
> (+562)-29699649
> (+569)-76347893
>
> franco.barrien...@exalitica.com
>
> www.exalitica.com
>
> [image: http://exalitica.com/web/img/frim.png]
>
>
>
>


Re: Help with updateStateByKey

2014-12-18 Thread Pierce Lamb
Hi Silvio,

This is a great suggestion (I wanted to get rid of groupByKey), I have been
trying to implement it this morning, but having some trouble. I would love
to see your code for the function that goes inside updateStateByKey

Here is my current code:

 def updateGroupByKey( newValues: Seq[(String, Long, Long)],
  currentValue: Option[Seq[(String, Long, Long)]]
  ): Option[Seq[(String, Long, Long)]] = {

  currentValue.map{ case (v) => v ++ newValues
  }
}

val grouped = ipTimeStamp.updateStateByKey(updateGroupByKey)


However, when I run it the grouped DStream doesn't get populated with
anything. The issue is probably that currentValue is not actually an
Option[Seq[triple]] but rather an Option[triple]. However if I change it to
an Option[triple] then I have to also return an Option[triple] for
updateStateByKey to compile, but I want that return value to be an
Option[Seq[triple]] because ultimately i want the data to look like
(IPaddress, Seq[(pageRequested, startTime, EndTime), (pageRequested,
startTime, EndTime)...]) and have that Seq build over time

Am I thinking about this wrong?

Thank you

On Thu, Dec 18, 2014 at 6:05 AM, Silvio Fiorito <
silvio.fior...@granturing.com> wrote:
>
> Hi Pierce,
>
> You shouldn’t have to use groupByKey because updateStateByKey will get a
> Seq of all the values for that key already.
>
> I used that for realtime sessionization as well. What I did was key my
> incoming events, then send them to udpateStateByKey. The updateStateByKey
> function then received a Seq of the events and the Option of the previous
> state for that key. The sessionization code then did its thing to check if
> the incoming events were part of the same session, based on a configured
> timeout. If a session already was active (from the previous state) and it
> hadn’t exceeded the timeout, it used that value. Otherwise it generated a
> new session id. Then the return value for the updateStateByKey function
> was a Tuple of session id and last timestamp.
>
> Then I joined the DStream with the session ids, which were both keyed off
> the same id and continued my processing. Your requirements may be
> different, but that’s what worked well for me.
>
> Another thing to consider is cleaning up old sessions by returning None in
> the updateStateByKey function. This will help with long running apps and
> minimize memory usage (and checkpoint size).
>
> I was using something similar to the method above on a live production
> stream with very little CPU and memory footprint, running for weeks at a
> time, processing up to 15M events per day with fluctuating traffic.
>
> Thanks,
> Silvio
>
>
>
> On 12/17/14, 10:07 PM, "Pierce Lamb" 
> wrote:
>
> >I am trying to run stateful Spark Streaming computations over (fake)
> >apache web server logs read from Kafka. The goal is to "sessionize"
> >the web traffic similar to this blog post:
> >
> http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessionizat
> >ion-with-spark-streaming-and-apache-hadoop/
> >
> >The only difference is that I want to "sessionize" each page the IP
> >hits, instead of the entire session. I was able to do this reading
> >from a file of fake web traffic using Spark in batch mode, but now I
> >want to do it in a streaming context.
> >
> >Log files are read from Kafka and parsed into K/V pairs of
> >
> >(String, (String, Long, Long)) or
> >
> >(IP, (requestPage, time, time))
> >
> >I then call "groupByKey()" on this K/V pair. In batch mode, this would
> >produce a:
> >
> >(String, CollectionBuffer((String, Long, Long), ...) or
> >
> >(IP, CollectionBuffer((requestPage, time, time), ...)
> >
> >In a StreamingContext, it produces a:
> >
> >(String, ArrayBuffer((String, Long, Long), ...) like so:
> >
> >(183.196.254.131,ArrayBuffer((/test.php,1418849762000,1418849762000)))
> >
> >However, as the next microbatch (DStream) arrives, this information is
> >discarded. Ultimately what I want is for that ArrayBuffer to fill up
> >over time as a given IP continues to interact and to run some
> >computations on its data to "sessionize" the page time. I believe the
> >operator to make that happen is "updateStateByKey." I'm having some
> >trouble with this operator (I'm new to both Spark & Scala); any help
> >is appreciated.
> >
> >Thus far:
> >
> >val grouped =
> >ipTimeStamp.groupByKey().updateStateByKey(updateGroupByKey)
> >
> >
> >def updateGroupByKey(
> >  a: Seq[(String, ArrayBuffer[(String,
> >Long, Long)])],
> >  b: Option[(String, ArrayBuffer[(String,
> >Long, Long)])]
> >  ): Option[(String, ArrayBuffer[(String,
> >Long, Long)])] = {
> >
> >  }
> >
> >-
> >To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >For additional commands, e-mail: user-h...@spark.apache.org
> >
>


RE: Effects problems in logistic regression

2014-12-18 Thread Franco Barrientos
Thanks I will try.

 

De: DB Tsai [mailto:dbt...@dbtsai.com] 
Enviado el: jueves, 18 de diciembre de 2014 16:24
Para: Franco Barrientos
CC: Sean Owen; user@spark.apache.org
Asunto: Re: Effects problems in logistic regression

 

Can you try LogisticRegressionWithLBFGS? I verified that this will be converged 
to the same result trained by R's glmnet package without regularization. The 
problem of LogisticRegressionWithSGD is it's very slow in term of converging, 
and lots of time, it's very sensitive to stepsize which can lead to wrong 
answer. 

 

The regularization logic in MLLib is not entirely correct, and it will penalize 
the intercept. In general, with really high regularization, all the 
coefficients will be zeros except the intercept. In logistic regression, the 
non-zero intercept can be understood as the prior-probability of each class, 
and in linear regression, this will be the mean of response. I'll have a PR to 
fix this issue.





Sincerely,

DB Tsai
---
My Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai

 

On Thu, Dec 18, 2014 at 10:50 AM, Franco Barrientos 
mailto:franco.barrien...@exalitica.com> > 
wrote:

Yes, without the “amounts” variables the results are similiar. When I put other 
variables its fine.

 

De: Sean Owen [mailto:so...@cloudera.com  ] 
Enviado el: jueves, 18 de diciembre de 2014 14:22
Para: Franco Barrientos
CC: user@spark.apache.org  
Asunto: Re: Effects problems in logistic regression

 

Are you sure this is an apples-to-apples comparison? for example does your SAS 
process normalize or otherwise transform the data first? 

 

Is the optimization configured similarly in both cases -- same regularization, 
etc.?

 

Are you sure you are pulling out the intercept correctly? It is a separate 
value from the logistic regression model in Spark.

 

On Thu, Dec 18, 2014 at 4:34 PM, Franco Barrientos 
mailto:franco.barrien...@exalitica.com> > 
wrote:

Hi all!,

 

I have a problem with LogisticRegressionWithSGD, when I train a data set with 
one variable (wich is a amount of an item) and intercept, I get weights of

(-0.4021,-207.1749) for both features, respectively. This don´t make sense to 
me because I run a logistic regression for the same data in SAS and I get these 
weights (-2.6604,0.000245).

 

The rank of this variable is from 0 to 59102 with a mean of 1158.

 

The problem is when I want to calculate the probabilities for each user from 
data set, this probability is near to zero or zero in much cases, because when 
spark calculates exp(-1*(-0.4021+(-207.1749)*amount)) this is a big number, in 
fact infinity for spark.

 

How can I treat this variable? or why this happened? 

 

Thanks ,

 

Franco Barrientos
Data Scientist

Málaga #115, Of. 1003, Las Condes.
Santiago, Chile.
(+562)-29699649  
(+569)-76347893  

franco.barrien...@exalitica.com   

www.exalitica.com  


   

 



Re: When will Spark SQL support building DB index natively?

2014-12-18 Thread Michael Armbrust
It is implemented in the same way as Hive and interoperates with the hive
metastore.  In 1.2 we are considering adding partitioning to the SparkSQL
data source API as well..  However, for now, you should create a hive
context and a partitioned table.  Spark SQL will automatically select
partitions when there are predicates in a query against the partitioning
columns.

On Wed, Dec 17, 2014 at 7:31 PM, Xuelin Cao  wrote:
>
> Thanks, I didn't try the partitioned table support (sounds like a hive
> feature)
>
> Is there any guideline? Should I use hiveContext to create the table with
> partition firstly?
>
>
>   On Thursday, December 18, 2014 2:28 AM, Michael Armbrust <
> mich...@databricks.com> wrote:
>
>
> - Dev list
>
> Have you looked at partitioned table support?  That would only scan data
> where the predicate matches the partition.  Depending on the cardinality of
> the customerId column that could be a good option for you.
>
> On Wed, Dec 17, 2014 at 2:25 AM, Xuelin Cao 
> wrote:
>
>
> Hi,
>  In Spark SQL help document, it says "Some of these (such as indexes)
> are less important due to Spark SQL’s in-memory  computational model.
> Others are slotted for future releases of Spark SQL.
>- Block level bitmap indexes and virtual columns (used to build
> indexes)"
>
>  For our use cases, DB index is quite important. I have about 300G
> data in our database, and we always use "customer id" as a predicate for DB
> look up.  Without DB index, we will have to scan all 300G data, and it will
> take > 1 minute for a simple DB look up, while MySQL only takes 10 seconds.
> We tried to create an independent table for each "customer id", the result
> is pretty good, but the logic will be very complex.
>  I'm wondering when will Spark SQL supports DB index, and before that,
> is there an alternative way to support DB index function?
> Thanks
>
>
>
>


does spark sql support columnar compression with encoding when caching tables

2014-12-18 Thread Sadhan Sood
Hi All,

Wondering if when caching a table backed by lzo compressed parquet data, if
spark also compresses it (using lzo/gzip/snappy) along with column level
encoding or just does the column level encoding when
"*spark.sql.inMemoryColumnarStorage.compressed"
*is set to true. This is because when I try to cache the data, I notice the
memory being used is almost as much as the uncompressed size of the data.

Thanks!


Re: Help with updateStateByKey

2014-12-18 Thread Silvio Fiorito
Ok, I have a better idea of what you’re trying to do now.

I think the prob might be the map. The first time the function runs, 
currentValue will be None. Using map on None returns None.

Instead, try:

Some(currentValue.getOrElse(Seq.empty) ++ newValues)

I think that should give you the expected result.


From: Pierce Lamb 
mailto:richard.pierce.l...@gmail.com>>
Date: Thursday, December 18, 2014 at 2:31 PM
To: Silvio Fiorito 
mailto:silvio.fior...@granturing.com>>
Cc: "user@spark.apache.org" 
mailto:user@spark.apache.org>>
Subject: Re: Help with updateStateByKey

Hi Silvio,

This is a great suggestion (I wanted to get rid of groupByKey), I have been 
trying to implement it this morning, but having some trouble. I would love to 
see your code for the function that goes inside updateStateByKey

Here is my current code:

 def updateGroupByKey( newValues: Seq[(String, Long, Long)],
  currentValue: Option[Seq[(String, Long, Long)]]
  ): Option[Seq[(String, Long, Long)]] = {

  currentValue.map{ case (v) => v ++ newValues
  }
}

val grouped = ipTimeStamp.updateStateByKey(updateGroupByKey)


However, when I run it the grouped DStream doesn't get populated with anything. 
The issue is probably that currentValue is not actually an Option[Seq[triple]] 
but rather an Option[triple]. However if I change it to an Option[triple] then 
I have to also return an Option[triple] for updateStateByKey to compile, but I 
want that return value to be an Option[Seq[triple]] because ultimately i want 
the data to look like (IPaddress, Seq[(pageRequested, startTime, EndTime), 
(pageRequested, startTime, EndTime)...]) and have that Seq build over time

Am I thinking about this wrong?

Thank you

On Thu, Dec 18, 2014 at 6:05 AM, Silvio Fiorito 
mailto:silvio.fior...@granturing.com>> wrote:
Hi Pierce,

You shouldn’t have to use groupByKey because updateStateByKey will get a
Seq of all the values for that key already.

I used that for realtime sessionization as well. What I did was key my
incoming events, then send them to udpateStateByKey. The updateStateByKey
function then received a Seq of the events and the Option of the previous
state for that key. The sessionization code then did its thing to check if
the incoming events were part of the same session, based on a configured
timeout. If a session already was active (from the previous state) and it
hadn’t exceeded the timeout, it used that value. Otherwise it generated a
new session id. Then the return value for the updateStateByKey function
was a Tuple of session id and last timestamp.

Then I joined the DStream with the session ids, which were both keyed off
the same id and continued my processing. Your requirements may be
different, but that’s what worked well for me.

Another thing to consider is cleaning up old sessions by returning None in
the updateStateByKey function. This will help with long running apps and
minimize memory usage (and checkpoint size).

I was using something similar to the method above on a live production
stream with very little CPU and memory footprint, running for weeks at a
time, processing up to 15M events per day with fluctuating traffic.

Thanks,
Silvio



On 12/17/14, 10:07 PM, "Pierce Lamb" 
mailto:richard.pierce.l...@gmail.com>> wrote:

>I am trying to run stateful Spark Streaming computations over (fake)
>apache web server logs read from Kafka. The goal is to "sessionize"
>the web traffic similar to this blog post:
>http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessionizat
>ion-with-spark-streaming-and-apache-hadoop/
>
>The only difference is that I want to "sessionize" each page the IP
>hits, instead of the entire session. I was able to do this reading
>from a file of fake web traffic using Spark in batch mode, but now I
>want to do it in a streaming context.
>
>Log files are read from Kafka and parsed into K/V pairs of
>
>(String, (String, Long, Long)) or
>
>(IP, (requestPage, time, time))
>
>I then call "groupByKey()" on this K/V pair. In batch mode, this would
>produce a:
>
>(String, CollectionBuffer((String, Long, Long), ...) or
>
>(IP, CollectionBuffer((requestPage, time, time), ...)
>
>In a StreamingContext, it produces a:
>
>(String, ArrayBuffer((String, Long, Long), ...) like so:
>
>(183.196.254.131,ArrayBuffer((/test.php,1418849762000,1418849762000)))
>
>However, as the next microbatch (DStream) arrives, this information is
>discarded. Ultimately what I want is for that ArrayBuffer to fill up
>over time as a given IP continues to interact and to run some
>computations on its data to "sessionize" the page time. I believe the
>operator to make that happen is "updateStateByKey." I'm having some
>trouble with this operator (I'm new to both Spark & Scala); any help
>is appreciated.
>
>Thus far:
>
>val grouped =
>ipTimeStamp.groupByKey().updateStateByKey(updateGroupByKey)
>
>
>def updateGroupByKey(
>   

Re: Help with updateStateByKey

2014-12-18 Thread Pierce Lamb
This produces the expected output, thank you!

On Thu, Dec 18, 2014 at 12:11 PM, Silvio Fiorito
 wrote:
> Ok, I have a better idea of what you’re trying to do now.
>
> I think the prob might be the map. The first time the function runs,
> currentValue will be None. Using map on None returns None.
>
> Instead, try:
>
> Some(currentValue.getOrElse(Seq.empty) ++ newValues)
>
> I think that should give you the expected result.
>
>
> From: Pierce Lamb 
> Date: Thursday, December 18, 2014 at 2:31 PM
> To: Silvio Fiorito 
> Cc: "user@spark.apache.org" 
> Subject: Re: Help with updateStateByKey
>
> Hi Silvio,
>
> This is a great suggestion (I wanted to get rid of groupByKey), I have been
> trying to implement it this morning, but having some trouble. I would love
> to see your code for the function that goes inside updateStateByKey
>
> Here is my current code:
>
>  def updateGroupByKey( newValues: Seq[(String, Long, Long)],
>   currentValue: Option[Seq[(String, Long, Long)]]
>   ): Option[Seq[(String, Long, Long)]] = {
>
>   currentValue.map{ case (v) => v ++ newValues
>   }
> }
>
> val grouped = ipTimeStamp.updateStateByKey(updateGroupByKey)
>
>
> However, when I run it the grouped DStream doesn't get populated with
> anything. The issue is probably that currentValue is not actually an
> Option[Seq[triple]] but rather an Option[triple]. However if I change it to
> an Option[triple] then I have to also return an Option[triple] for
> updateStateByKey to compile, but I want that return value to be an
> Option[Seq[triple]] because ultimately i want the data to look like
> (IPaddress, Seq[(pageRequested, startTime, EndTime), (pageRequested,
> startTime, EndTime)...]) and have that Seq build over time
>
> Am I thinking about this wrong?
>
> Thank you
>
> On Thu, Dec 18, 2014 at 6:05 AM, Silvio Fiorito
>  wrote:
>>
>> Hi Pierce,
>>
>> You shouldn’t have to use groupByKey because updateStateByKey will get a
>> Seq of all the values for that key already.
>>
>> I used that for realtime sessionization as well. What I did was key my
>> incoming events, then send them to udpateStateByKey. The updateStateByKey
>> function then received a Seq of the events and the Option of the previous
>> state for that key. The sessionization code then did its thing to check if
>> the incoming events were part of the same session, based on a configured
>> timeout. If a session already was active (from the previous state) and it
>> hadn’t exceeded the timeout, it used that value. Otherwise it generated a
>> new session id. Then the return value for the updateStateByKey function
>> was a Tuple of session id and last timestamp.
>>
>> Then I joined the DStream with the session ids, which were both keyed off
>> the same id and continued my processing. Your requirements may be
>> different, but that’s what worked well for me.
>>
>> Another thing to consider is cleaning up old sessions by returning None in
>> the updateStateByKey function. This will help with long running apps and
>> minimize memory usage (and checkpoint size).
>>
>> I was using something similar to the method above on a live production
>> stream with very little CPU and memory footprint, running for weeks at a
>> time, processing up to 15M events per day with fluctuating traffic.
>>
>> Thanks,
>> Silvio
>>
>>
>>
>> On 12/17/14, 10:07 PM, "Pierce Lamb" 
>> wrote:
>>
>> >I am trying to run stateful Spark Streaming computations over (fake)
>> >apache web server logs read from Kafka. The goal is to "sessionize"
>> >the web traffic similar to this blog post:
>>
>> > >http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessionizat
>> >ion-with-spark-streaming-and-apache-hadoop/
>> >
>> >The only difference is that I want to "sessionize" each page the IP
>> >hits, instead of the entire session. I was able to do this reading
>> >from a file of fake web traffic using Spark in batch mode, but now I
>> >want to do it in a streaming context.
>> >
>> >Log files are read from Kafka and parsed into K/V pairs of
>> >
>> >(String, (String, Long, Long)) or
>> >
>> >(IP, (requestPage, time, time))
>> >
>> >I then call "groupByKey()" on this K/V pair. In batch mode, this would
>> >produce a:
>> >
>> >(String, CollectionBuffer((String, Long, Long), ...) or
>> >
>> >(IP, CollectionBuffer((requestPage, time, time), ...)
>> >
>> >In a StreamingContext, it produces a:
>> >
>> >(String, ArrayBuffer((String, Long, Long), ...) like so:
>> >
>> >(183.196.254.131,ArrayBuffer((/test.php,1418849762000,1418849762000)))
>> >
>> >However, as the next microbatch (DStream) arrives, this information is
>> >discarded. Ultimately what I want is for that ArrayBuffer to fill up
>> >over time as a given IP continues to interact and to run some
>> >computations on its data to "sessionize" the page time. I believe the
>> >operator to make that happen is "updateStateByKey." I'm having some
>> >trouble with this operator (I'm new to both Spark & Scala); 

Re: Help with updateStateByKey

2014-12-18 Thread Silvio Fiorito
Great, glad it worked out! Just keep an eye on memory usage as you roll it 
out. Like I said before, if you’ll be running this 24/7 consider cleaning 
up sessions by returning None after some sort of timeout.




On 12/18/14, 8:25 PM, "Pierce Lamb"  wrote:

>This produces the expected output, thank you!
>
>On Thu, Dec 18, 2014 at 12:11 PM, Silvio Fiorito
> wrote:
>> Ok, I have a better idea of what you’re trying to do now.
>>
>> I think the prob might be the map. The first time the function runs,
>> currentValue will be None. Using map on None returns None.
>>
>> Instead, try:
>>
>> Some(currentValue.getOrElse(Seq.empty) ++ newValues)
>>
>> I think that should give you the expected result.
>>
>>
>> From: Pierce Lamb 
>> Date: Thursday, December 18, 2014 at 2:31 PM
>> To: Silvio Fiorito 
>> Cc: "user@spark.apache.org" 
>> Subject: Re: Help with updateStateByKey
>>
>> Hi Silvio,
>>
>> This is a great suggestion (I wanted to get rid of groupByKey), I have 
>>been
>> trying to implement it this morning, but having some trouble. I would 
>>love
>> to see your code for the function that goes inside updateStateByKey
>>
>> Here is my current code:
>>
>>  def updateGroupByKey( newValues: Seq[(String, Long, Long)],
>>   currentValue: Option[Seq[(String, Long, 
>>Long)]]
>>   ): Option[Seq[(String, Long, Long)]] = {
>>
>>   currentValue.map{ case (v) => v ++ newValues
>>   }
>> }
>>
>> val grouped = ipTimeStamp.updateStateByKey(updateGroupByKey)
>>
>>
>> However, when I run it the grouped DStream doesn't get populated with
>> anything. The issue is probably that currentValue is not actually an
>> Option[Seq[triple]] but rather an Option[triple]. However if I change 
>>it to
>> an Option[triple] then I have to also return an Option[triple] for
>> updateStateByKey to compile, but I want that return value to be an
>> Option[Seq[triple]] because ultimately i want the data to look like
>> (IPaddress, Seq[(pageRequested, startTime, EndTime), (pageRequested,
>> startTime, EndTime)...]) and have that Seq build over time
>>
>> Am I thinking about this wrong?
>>
>> Thank you
>>
>> On Thu, Dec 18, 2014 at 6:05 AM, Silvio Fiorito
>>  wrote:
>>>
>>> Hi Pierce,
>>>
>>> You shouldn’t have to use groupByKey because updateStateByKey will get 
>>>a
>>> Seq of all the values for that key already.
>>>
>>> I used that for realtime sessionization as well. What I did was key my
>>> incoming events, then send them to udpateStateByKey. The 
>>>updateStateByKey
>>> function then received a Seq of the events and the Option of the 
>>>previous
>>> state for that key. The sessionization code then did its thing to 
>>>check if
>>> the incoming events were part of the same session, based on a 
>>>configured
>>> timeout. If a session already was active (from the previous state) and 
>>>it
>>> hadn’t exceeded the timeout, it used that value. Otherwise it 
>>>generated a
>>> new session id. Then the return value for the updateStateByKey function
>>> was a Tuple of session id and last timestamp.
>>>
>>> Then I joined the DStream with the session ids, which were both keyed 
>>>off
>>> the same id and continued my processing. Your requirements may be
>>> different, but that’s what worked well for me.
>>>
>>> Another thing to consider is cleaning up old sessions by returning 
>>>None in
>>> the updateStateByKey function. This will help with long running apps 
>>>and
>>> minimize memory usage (and checkpoint size).
>>>
>>> I was using something similar to the method above on a live production
>>> stream with very little CPU and memory footprint, running for weeks at 
>>>a
>>> time, processing up to 15M events per day with fluctuating traffic.
>>>
>>> Thanks,
>>> Silvio
>>>
>>>
>>>
>>> On 12/17/14, 10:07 PM, "Pierce Lamb" 
>>> wrote:
>>>
>>> >I am trying to run stateful Spark Streaming computations over (fake)
>>> >apache web server logs read from Kafka. The goal is to "sessionize"
>>> >the web traffic similar to this blog post:
>>>
>>> > 
http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessioni
zat
>>> >ion-with-spark-streaming-and-apache-hadoop/
>>> >
>>> >The only difference is that I want to "sessionize" each page the IP
>>> >hits, instead of the entire session. I was able to do this reading
>>> >from a file of fake web traffic using Spark in batch mode, but now I
>>> >want to do it in a streaming context.
>>> >
>>> >Log files are read from Kafka and parsed into K/V pairs of
>>> >
>>> >(String, (String, Long, Long)) or
>>> >
>>> >(IP, (requestPage, time, time))
>>> >
>>> >I then call "groupByKey()" on this K/V pair. In batch mode, this would
>>> >produce a:
>>> >
>>> >(String, CollectionBuffer((String, Long, Long), ...) or
>>> >
>>> >(IP, CollectionBuffer((requestPage, time, time), ...)
>>> >
>>> >In a StreamingContext, it produces a:
>>> >
>>> >(String, ArrayBuffer((String, Long, Long), ...) like so:
>>> >
>>> >(183.196.254.131,ArrayBuffer((/test.php,1418849762000,1

UNION two RDDs

2014-12-18 Thread Jerry Lam
Hi Spark users,

I wonder if val resultRDD = RDDA.union(RDDB) will always have records in
RDDA before records in RDDB.

Also, will resultRDD.coalesce(1) change this ordering?

Best Regards,

Jerry


Re: Standalone Spark program

2014-12-18 Thread Andrew Or
Hey Akshat,

What is the class that is not found, is it a Spark class or classes that
you define in your own application? If the latter, then Akhil's solution
should work (alternatively you can also pass the jar through the --jars
command line option in spark-submit).

If it's a Spark class, however, it's likely that the Spark assembly jar is
not present on the worker nodes. When you build Spark on the cluster, you
will need to rsync it to the same path on all the nodes in your cluster.
For more information, see
http://spark.apache.org/docs/latest/spark-standalone.html.

-Andrew

2014-12-18 10:29 GMT-08:00 Akhil Das :
>
> You can build a jar of your project and add it to the sparkContext
> (sc.addJar("/path/to/your/project.jar")) then it will get shipped to the
> worker and hence no classNotfoundException!
>
> Thanks
> Best Regards
>
> On Thu, Dec 18, 2014 at 10:06 PM, Akshat Aranya  wrote:
>>
>> Hi,
>>
>> I am building a Spark-based service which requires initialization of a
>> SparkContext in a main():
>>
>> def main(args: Array[String]) {
>> val conf = new SparkConf(false)
>>   .setMaster("spark://foo.example.com:7077")
>>   .setAppName("foobar")
>>
>> val sc = new SparkContext(conf)
>> val rdd = sc.parallelize(0 until 255)
>> val res =  rdd.mapPartitions(it => it).take(1)
>> println(s"res=$res")
>> sc.stop()
>> }
>>
>> This code works fine via REPL, but not as a standalone program; it causes
>> a ClassNotFoundException.  This has me confused about how code is shipped
>> out to executors.  When using via REPL, does the mapPartitions closure,
>> it=>it, get sent out when the REPL statement is executed?  When this code
>> is run as a standalone program (not via spark-submit), is the compiled code
>> expected to be present at the the executor?
>>
>> Thanks,
>> Akshat
>>
>>


Spark GraphX question.

2014-12-18 Thread Tae-Hyuk Ahn
Hi All,

I am wondering what is the best way to remove transitive edges with maximum
spanning tree. For example,

Edges:
1 -> 2 (30)
2 -> 3 (30)
1 -> 3 (25)

where parenthesis is a weight for each edge.

Then, I'd like to get the reduced edges graph after "Transitive Reduction"
with considering the weight as a maximum spanning tree.

Edges:
1 -> 2 (30)
2 -> 3 (30)

Do you have a good idea for this?

Thanks,

Ted




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-GraphX-question-tp20768.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Creating a smaller, derivative RDD from an RDD

2014-12-18 Thread bethesda
We have a very large RDD and I need to create a new RDD whose values are
derived from each record of the original RDD, and we only retain the few new
records that meet a criteria.  I want to avoid creating a second large RDD
and then filtering it since I believe this could tax system resources
unnecessarily (tell me if that assumption is wrong.)

So for example, /and this is just an example/, say we have an RDD with 1 to
1,000,000 and we iterate through each value, and compute it's md5 hash, and
we only keep the results that start with 'A'.

What we've tried and seems to work but which seemed a bit ugly, and perhaps
not efficient, was the following in pseudocode. * Is this the best way to do
this?*

Thanks

bigRdd.flatMap( { i =>
  val h = md5(i)
  if (h.substring(1,1) == 'A') {
Array(h)
  } else {
Array[String]()
  }
})



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Creating-a-smaller-derivative-RDD-from-an-RDD-tp20769.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Control default partition when load a RDD from HDFS

2014-12-18 Thread Shuai Zheng
Hmmm, how to do that? You mean for each file create a RDD? Then I will have
tons of RDD.

And my calculation need to rely on other input, not just the file itself

 

Can you show some pseudo code for that logic?

 

Regards,

 

Shuai

 

From: Diego García Valverde [mailto:dgarci...@agbar.es] 
Sent: Wednesday, December 17, 2014 11:04 AM
To: Shuai Zheng; 'Sun, Rui'; user@spark.apache.org
Subject: RE: Control default partition when load a RDD from HDFS

 

Why not is a good option to create a RDD per each 200Mb file and then apply
the pre-calculations before merging them? I think the partitions per RDD
must be transparent to the pre-calculations, and not to set them fixed to
optimize the spark maps/reduces processes.

 

De: Shuai Zheng [mailto:szheng.c...@gmail.com] 
Enviado el: miércoles, 17 de diciembre de 2014 16:01
Para: 'Sun, Rui'; user@spark.apache.org
Asunto: RE: Control default partition when load a RDD from HDFS

 

Nice, that is the answer I want. 

Thanks!

 

From: Sun, Rui [mailto:rui@intel.com] 
Sent: Wednesday, December 17, 2014 1:30 AM
To: Shuai Zheng; user@spark.apache.org
Subject: RE: Control default partition when load a RDD from HDFS

 

Hi, Shuai,

 

How did you turn off the file split in Hadoop? I guess you might have
implemented a customized FileInputFormat which overrides isSplitable() to
return FALSE. If you do have such FileInputFormat, you can simply pass it as
a constructor parameter to HadoopRDD or NewHadoopRDD in Spark.

 

From: Shuai Zheng [mailto:szheng.c...@gmail.com] 
Sent: Wednesday, December 17, 2014 4:16 AM
To: user@spark.apache.org
Subject: Control default partition when load a RDD from HDFS

 

Hi All,

 

My application load 1000 files, each file from 200M –  a few GB, and combine
with other data to do calculation. 

Some pre-calculation must be done on each file level, then after that, the
result need to combine to do further calculation. 

In Hadoop, it is simple because I can turn-off the file split for input
format (to enforce each file will go to same mapper), then I will do the
file level calculation in mapper and pass result to reducer. But in spark,
how can I do it?

Basically I want to make sure after I load these files into RDD, it is
partitioned by file (not split file and also no merge there), so I can call
mapPartitions. Is it any way I can control the default partition when I load
the RDD? 

This might be the default behavior that spark do the partition (partitioned
by file when first time load the RDD), but I can’t find any document to
support my guess, if not, can I enforce this kind of partition? Because the
total file size is bigger, I don’t want to re-partition in the code. 

 

Regards,

 

Shuai

 

  _  

Disclaimer: http://disclaimer.agbar.com



Re: hello

2014-12-18 Thread Harihar Nahak
You mean to Spark User List, Its pretty easy. check the first  email it has
all instructions

On 18 December 2014 at 21:56, csjtx1021 [via Apache Spark User List] <
ml-node+s1001560n20759...@n3.nabble.com> wrote:
>
> i want to join you
>
> --
>  If you reply to this email, your message will be added to the discussion
> below:
> http://apache-spark-user-list.1001560.n3.nabble.com/hello-tp20759.html
>  To start a new topic under Apache Spark User List, email
> ml-node+s1001560n1...@n3.nabble.com
> To unsubscribe from Apache Spark User List, click here
> 
> .
> NAML
> 
>


-- 
Regards,
Harihar Nahak
BigData Developer
Wynyard
Email:hna...@wynyardgroup.com | Extn: 8019




-
--Harihar
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/hello-tp20759p20770.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Spark GraphX question.

2014-12-18 Thread Harihar Nahak
Hi Ted,

I've no idea what is Transitive Reduction but the expected result you can
achieve by graph.subgraph(graph.edges.filter()) syntax and which filter
edges by its weight and give you new graph as per your condition.

On 19 December 2014 at 11:11, Tae-Hyuk Ahn [via Apache Spark User List] <
ml-node+s1001560n20768...@n3.nabble.com> wrote:
>
> Hi All,
>
> I am wondering what is the best way to remove transitive edges with
> maximum spanning tree. For example,
>
> Edges:
> 1 -> 2 (30)
> 2 -> 3 (30)
> 1 -> 3 (25)
>
> where parenthesis is a weight for each edge.
>
> Then, I'd like to get the reduced edges graph after "Transitive Reduction"
> with considering the weight as a maximum spanning tree.
>
> Edges:
> 1 -> 2 (30)
> 2 -> 3 (30)
>
> Do you have a good idea for this?
>
> Thanks,
>
> Ted
>
>
> --
>  If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-GraphX-question-tp20768.html
>  To start a new topic under Apache Spark User List, email
> ml-node+s1001560n1...@n3.nabble.com
> To unsubscribe from Apache Spark User List, click here
> 
> .
> NAML
> 
>


-- 
Regards,
Harihar Nahak
BigData Developer
Wynyard
Email:hna...@wynyardgroup.com | Extn: 8019




-
--Harihar
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-GraphX-question-tp20768p20771.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

How to increase parallelism in Yarn

2014-12-18 Thread Suman Somasundar
Hi,

 

I am using Spark 1.1.1 on Yarn. When I try to run K-Means, I see from the Yarn 
dashboard that only 3 containers are being used. How do I increase the number 
of containers used?

 

P.S: When I run K-Means on Mahout with the same settings, I see that there are 
25-30 containers being used.

 

Thanks,
Suman.


Re: Creating a smaller, derivative RDD from an RDD

2014-12-18 Thread Sean Owen
I don't think you can avoid examining each element of the RDD, if
that's what you mean. Your approach is basically the best you can do
in general. You're not making a second RDD here, and even if you did
this in two steps, the second RDD is really more of a bookkeeping that
a second huge data structure.

You can simplify your example a bit, although I doubt it's noticeably faster:

bigRdd.flatMap { i =>
  val h = md5(i)
  if (h(0) == 'A') {
Some(h)
  } else {
None
  }
}

This is also fine, simpler still, and if it's slower, not by much:

bigRdd.map(md5).filter(_(0) == 'A')


On Thu, Dec 18, 2014 at 10:18 PM, bethesda  wrote:
> We have a very large RDD and I need to create a new RDD whose values are
> derived from each record of the original RDD, and we only retain the few new
> records that meet a criteria.  I want to avoid creating a second large RDD
> and then filtering it since I believe this could tax system resources
> unnecessarily (tell me if that assumption is wrong.)
>
> So for example, /and this is just an example/, say we have an RDD with 1 to
> 1,000,000 and we iterate through each value, and compute it's md5 hash, and
> we only keep the results that start with 'A'.
>
> What we've tried and seems to work but which seemed a bit ugly, and perhaps
> not efficient, was the following in pseudocode. * Is this the best way to do
> this?*
>
> Thanks
>
> bigRdd.flatMap( { i =>
>   val h = md5(i)
>   if (h.substring(1,1) == 'A') {
> Array(h)
>   } else {
> Array[String]()
>   }
> })
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Creating-a-smaller-derivative-RDD-from-an-RDD-tp20769.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to increase parallelism in Yarn

2014-12-18 Thread Andrew Or
Hi Suman,

I'll assume that you are using spark submit to run your application. You
can pass the --num-executors flag to ask for more containers. If you want
to allocate more memory for each executor, you may also pass in the
--executor-memory flag (this accepts a string in the format 1g, 512m etc.).

-Andrew

2014-12-18 14:37 GMT-08:00 Suman Somasundar :
>
> Hi,
>
>
>
> I am using Spark 1.1.1 on Yarn. When I try to run K-Means, I see from the
> Yarn dashboard that only 3 containers are being used. How do I increase the
> number of containers used?
>
>
>
> P.S: When I run K-Means on Mahout with the same settings, I see that there
> are 25-30 containers being used.
>
>
>
> Thanks,
> Suman.
>


Re: MLLib /ALS : java.lang.OutOfMemoryError: Java heap space

2014-12-18 Thread Xiangrui Meng
Hi Jay,

Please try increasing executor memory (if the available memory is more
than 2GB) and reduce numBlocks in ALS. The current implementation
stores all subproblems in memory and hence the memory requirement is
significant when k is large. You can also try reducing k and see
whether the problem is still there. I made a PR that improves the ALS
implementation, which generates subproblems one by one. You can try
that as well.

https://github.com/apache/spark/pull/3720

Best,
Xiangrui

On Wed, Dec 17, 2014 at 6:57 PM, buring  wrote:
> I am not sure this can help you. I have 57 million rating,about 4million user
> and 4k items. I used 7-14 total-executor-cores,executal-memory 13g,cluster
> have 4 nodes,each have 4cores,max memory 16g.
> I found set as follows may help avoid this problem:
> conf.set("spark.shuffle.memoryFraction","0.65") //default is 0.2
> conf.set("spark.storage.memoryFraction","0.3")//default is 0.6
> I have to set rank value under 40, otherwise occure this problem.
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/MLLib-ALS-java-lang-OutOfMemoryError-Java-heap-space-tp20584p20755.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Do I need to applied feature scaling via StandardScaler for LBFGS for Linear Regression?

2014-12-18 Thread Bui, Tri
Thanks dbtsai for the info.

Are you using the case class for:
Case(response, vec) => ?

Also, what library do I need to import to use .toBreeze ?

Thanks, 
tri

-Original Message-
From: dbt...@dbtsai.com [mailto:dbt...@dbtsai.com] 
Sent: Friday, December 12, 2014 3:27 PM
To: Bui, Tri
Cc: user@spark.apache.org
Subject: Re: Do I need to applied feature scaling via StandardScaler for LBFGS 
for Linear Regression?

You can do something like the following.

val rddVector = input.map({
  case (response, vec) => {
val newVec = MLUtils.appendBias(vec)
newVec.toBreeze(newVec.size - 1) = response
newVec
  }
}

val scalerWithResponse = new StandardScaler(true, true).fit(rddVector)

val trainingData =  scalerWithResponse.transform(rddVector).map(x=> {
  (x(x.size - 1), Vectors.dense(x.toArray.slice(0, x.size -1))
})

Sincerely,

DB Tsai
---
My Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai


On Fri, Dec 12, 2014 at 12:23 PM, Bui, Tri  wrote:
> Thanks for the info.
>
> How do I use StandardScaler() to scale example data  (10246.0,[14111.0,1.0]) ?
>
> Thx
> tri
>
> -Original Message-
> From: dbt...@dbtsai.com [mailto:dbt...@dbtsai.com]
> Sent: Friday, December 12, 2014 1:26 PM
> To: Bui, Tri
> Cc: user@spark.apache.org
> Subject: Re: Do I need to applied feature scaling via StandardScaler for 
> LBFGS for Linear Regression?
>
> It seems that your response is not scaled which will cause issue in LBFGS. 
> Typically, people train Linear Regression with zero-mean/unit-variable 
> feature and response without training the intercept. Since the response is 
> zero-mean, the intercept will be always zero. When you convert the 
> coefficients to the oringal space from the scaled space, the intercept can be 
> computed by w0 = y - \sum  w_n where  is the average of column n.
>
> Sincerely,
>
> DB Tsai
> ---
> My Blog: https://www.dbtsai.com
> LinkedIn: https://www.linkedin.com/in/dbtsai
>
>
> On Fri, Dec 12, 2014 at 10:49 AM, Bui, Tri  
> wrote:
>> Thanks for the confirmation.
>>
>> Fyi..The code below works for similar dataset, but with the feature 
>> magnitude changed,  LBFGS converged to the right weights.
>>
>> Example, time sequential Feature value 1, 2, 3, 4, 5, would generate the 
>> error while sequential feature 14111, 14112, 14113,14115 would converge to  
>> the right weight.  Why?
>>
>> Below is code to implement standardscaler() for sample data  
>> (10246.0,[14111.0,1.0])):
>>
>> val scaler1 = new StandardScaler().fit(train.map(x => x.features)) 
>> val
>> train1 = train.map(x => (x.label, scaler1.transform(x.features)))
>>
>> But I  keeps on getting error: "value features is not a member of (Double, 
>> org.apache.spark.mllib.linalg.Vector)"
>>
>> Should my feature vector be .toInt instead of Double?
>>
>> Also, the error  org.apache.spark.mllib.linalg.Vector  should have an 
>> "s" to match import library org.apache.spark.mllib.linalg.Vectors
>>
>> Thanks
>> Tri
>>
>>
>>
>>
>>
>> -Original Message-
>> From: dbt...@dbtsai.com [mailto:dbt...@dbtsai.com]
>> Sent: Friday, December 12, 2014 12:16 PM
>> To: Bui, Tri
>> Cc: user@spark.apache.org
>> Subject: Re: Do I need to applied feature scaling via StandardScaler for 
>> LBFGS for Linear Regression?
>>
>> You need to do the StandardScaler to help the convergency yourself.
>> LBFGS just takes whatever objective function you provide without doing any 
>> scaling. I will like to provide LinearRegressionWithLBFGS which does the 
>> scaling internally in the nearly feature.
>>
>> Sincerely,
>>
>> DB Tsai
>> ---
>> My Blog: https://www.dbtsai.com
>> LinkedIn: https://www.linkedin.com/in/dbtsai
>>
>>
>> On Fri, Dec 12, 2014 at 8:49 AM, Bui, Tri 
>>  wrote:
>>> Hi,
>>>
>>>
>>>
>>> Trying to use LBFGS as the optimizer, do I need to implement feature 
>>> scaling via StandardScaler or does LBFGS do it by default?
>>>
>>>
>>>
>>> Following code  generated error “ Failure again!  Giving up and 
>>> returning, Maybe the objective is just poorly behaved ?”.
>>>
>>>
>>>
>>> val data = sc.textFile("file:///data/Train/final2.train")
>>>
>>> val parsedata = data.map { line =>
>>>
>>> val partsdata = line.split(',')
>>>
>>> LabeledPoint(partsdata(0).toDouble, Vectors.dense(partsdata(1).split('
>>> ').map(_.toDouble)))
>>>
>>> }
>>>
>>>
>>>
>>> val train = parsedata.map(x => (x.label,
>>> MLUtils.appendBias(x.features))).cache()
>>>
>>>
>>>
>>> val numCorrections = 10
>>>
>>> val convergenceTol = 1e-4
>>>
>>> val maxNumIterations = 50
>>>
>>> val regParam = 0.1
>>>
>>> val initialWeightsWithIntercept = Vectors.dense(new 
>>> Array[Double](2))
>>>
>>>
>>>
>>> val (weightsWithIntercept, loss) = LBFGS.runLBFGS(train,
>>>
>>>   new LeastSquaresGradient(),
>>>
>>>   new SquaredL2Updater(),
>>>
>>>   numCorrections,
>>>
>>>   convergenceTol,
>>>
>>>   maxNumIterations

Sharing sqlContext between Akka router and "routee" actors ...

2014-12-18 Thread Manoj Samel
Hi,

Akka router creates a sqlContext and creates a bunch of "routees" actors
 with sqlContext as parameter. The actors then execute query on that
sqlContext.

Would this pattern be a issue ? Any other way sparkContext etc. should be
shared cleanly in Akka routers/routees ?

Thanks,


Re: does spark sql support columnar compression with encoding when caching tables

2014-12-18 Thread Michael Armbrust
There is only column level encoding (run length encoding, delta encoding,
dictionary encoding) and no generic compression.

On Thu, Dec 18, 2014 at 12:07 PM, Sadhan Sood  wrote:
>
> Hi All,
>
> Wondering if when caching a table backed by lzo compressed parquet data,
> if spark also compresses it (using lzo/gzip/snappy) along with column level
> encoding or just does the column level encoding when 
> "*spark.sql.inMemoryColumnarStorage.compressed"
> *is set to true. This is because when I try to cache the data, I notice
> the memory being used is almost as much as the uncompressed size of the
> data.
>
> Thanks!
>


Re: Sharing sqlContext between Akka router and "routee" actors ...

2014-12-18 Thread Soumya Simanta
why do you need a router? I mean cannot you do with just one actor which
has the SQLContext inside it?

On Thu, Dec 18, 2014 at 9:45 PM, Manoj Samel 
wrote:

> Hi,
>
> Akka router creates a sqlContext and creates a bunch of "routees" actors
>  with sqlContext as parameter. The actors then execute query on that
> sqlContext.
>
> Would this pattern be a issue ? Any other way sparkContext etc. should be
> shared cleanly in Akka routers/routees ?
>
> Thanks,
>


java.lang.ExceptionInInitializerError/Unable to load YARN support

2014-12-18 Thread maven
All, 

I just built Spark-1.2 on my enterprise server (which has Hadoop 2.3 with
YARN). Here're the steps I followed for the build: 

$ mvn -Pyarn -Phadoop-2.3 -Dhadoop.version=2.3.0 -DskipTests clean package 
$ export SPARK_HOME=/path/to/spark/folder 
$ export HADOOP_CONF_DIR=/etc/hadoop/conf 

However, when I try to work with this installation either locally or on
YARN, I get the following error: 

Exception in thread "main" java.lang.ExceptionInInitializerError 
at
org.apache.spark.util.Utils$.getSparkOrYarnConfig(Utils.scala:1784) 
at
org.apache.spark.storage.BlockManager.(BlockManager.scala:105) 
at
org.apache.spark.storage.BlockManager.(BlockManager.scala:180) 
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:292) 
at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:159) 
at org.apache.spark.SparkContext.(SparkContext.scala:232) 
at water.MyDriver$.main(MyDriver.scala:19) 
at water.MyDriver.main(MyDriver.scala) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 
at java.lang.reflect.Method.invoke(Method.java:606) 
at
org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:360) 
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) 
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
Caused by: org.apache.spark.SparkException: Unable to load YARN support 
at
org.apache.spark.deploy.SparkHadoopUtil$.liftedTree1$1(SparkHadoopUtil.scala:199)
 
at
org.apache.spark.deploy.SparkHadoopUtil$.(SparkHadoopUtil.scala:194) 
at
org.apache.spark.deploy.SparkHadoopUtil$.(SparkHadoopUtil.scala) 
... 15 more 
Caused by: java.lang.IllegalArgumentException: Invalid rule: L 
RULE:[2:$1@$0](.*@XXXCOMPANY.COM)s/(.*)@XXXCOMPANY.COM/$1/L 
DEFAULT 
at
org.apache.hadoop.security.authentication.util.KerberosName.parseRules(KerberosName.java:321)
 
at
org.apache.hadoop.security.authentication.util.KerberosName.setRules(KerberosName.java:386)
 
at
org.apache.hadoop.security.HadoopKerberosName.setConfiguration(HadoopKerberosName.java:75)
 
at
org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:247)
 
at
org.apache.hadoop.security.UserGroupInformation.setConfiguration(UserGroupInformation.java:283)
 
at
org.apache.spark.deploy.SparkHadoopUtil.(SparkHadoopUtil.scala:43) 
at
org.apache.spark.deploy.yarn.YarnSparkHadoopUtil.(YarnSparkHadoopUtil.scala:45)
 
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method) 
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
 
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 
at java.lang.reflect.Constructor.newInstance(Constructor.java:526) 
at java.lang.Class.newInstance(Class.java:374) 
at
org.apache.spark.deploy.SparkHadoopUtil$.liftedTree1$1(SparkHadoopUtil.scala:196)
 
... 17 more 

I noticed that when I unset HADOOP_CONF_DIR, I'm able to work in the local
mode without any errors. I'm able to work with pre-installed Spark 1.0,
locally and on yarn, without any issues. It looks like I may be missing a
configuration step somewhere. Any thoughts on what may be causing this? 

NR



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-ExceptionInInitializerError-Unable-to-load-YARN-support-tp20775.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark GraphX question.

2014-12-18 Thread Tae-Hyuk Ahn
Thanks, Harihar. 

But this is slightly more complicate than just using subgraph(filter()). 

See the transitive reduction. 
http://en.wikipedia.org/wiki/Transitive_reduction

My case has one more additional requirement to think about weight (like a
maximum spanning tree).

Using a linear transitive reduction algorithm (and get some hints from
"TriangleCount.scale" in GraphX), it might have some steps as

1. Compute the set of neighbors for each vertex.
2. For each edge, compute the intersection of the sets and send the weight
to both vertices.
3. For each vertex, mark an edge as "False" if it has the intersection, but
lower weight.
4. Remove all "False" edges using subgraph.

But I am sure GraphX developer might have a better scalable and succinct
idea for this problem.

Thanks,

Ted



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-GraphX-question-tp20768p20777.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: SPARK-2243 Support multiple SparkContexts in the same JVM

2014-12-18 Thread Anton Brazhnyk
Well, that's actually what I need (one simple app, several contexts, similar to 
what JobServer does) and I'm just looking for some workaround here. 
Classloaders look a little easier for me than spawning my own processes.
Being more specific, I just need to be able to execute arbitrary Spark jobs 
from long lived web-application with no prior knowledge of those jobs, so I 
need to accept jars with those jobs (again, like JobServer).

As far as I understand I can't load jars to SparkContext which has spawned 
executors on the cluster. I need to create new one to load new jars. Am I right 
on this?


-Original Message-
From: Sean Owen [mailto:so...@cloudera.com] 
Sent: Thursday, December 18, 2014 2:04 AM
To: Anton Brazhnyk
Cc: user@spark.apache.org
Subject: Re: SPARK-2243 Support multiple SparkContexts in the same JVM

Yes, although once you have multiple ClassLoaders, you are operating as if in 
multiple JVMs for most intents and purposes. I think the request for this kind 
of functionality comes from use cases where multiple ClassLoaders wouldn't 
work, like, wanting to have one app (in one ClassLoader) managing multiple 
contexts.

On Thu, Dec 18, 2014 at 2:23 AM, Anton Brazhnyk  
wrote:
> Greetings,
>
>
>
> First comment on the issue says that reason for non-supporting of 
> multiple contexts is “There are numerous assumptions in the code base 
> that uses a shared cache or thread local variables or some global 
> identifiers which prevent us from using multiple SparkContext's.”
>
>
>
> May it be worked around by creating those context in several 
> classloaders with their own copies of Spark classes?
>
>
>
> Thanks,
>
> Anton



Re: UNION two RDDs

2014-12-18 Thread madhu phatak
Hi,
coalesce is an operation which changes no of records in a partition. It
will not touch ordering with in a row AFAIK.

On Fri, Dec 19, 2014 at 2:22 AM, Jerry Lam  wrote:
>
> Hi Spark users,
>
> I wonder if val resultRDD = RDDA.union(RDDB) will always have records in
> RDDA before records in RDDB.
>
> Also, will resultRDD.coalesce(1) change this ordering?
>
> Best Regards,
>
> Jerry
>


-- 
Regards,
Madhukara Phatak
http://www.madhukaraphatak.com


Re: Can we specify driver running on a specific machine of the cluster on yarn-cluster mode?

2014-12-18 Thread madhu phatak
Hi,
 The driver runs on the machine from where you did the spark-submit. You
cannot change that.

On Thu, Dec 18, 2014 at 3:44 PM, LinQili  wrote:
>
> Hi all,
> On yarn-cluster mode, can we let the driver running on a specific machine
> that we choose in cluster ? Or, even the machine not in the cluster?
>


-- 
Regards,
Madhukara Phatak
http://www.madhukaraphatak.com


When will spark 1.2 released?

2014-12-18 Thread vboylin1...@gmail.com
Hi, Dose any know when will spark 1.2 released? 1.2 has many great feature that 
we can't wait now ,-) Sincely Lin wukang 发自网易邮箱大师

Re: SchemaRDD.sample problem

2014-12-18 Thread madhu phatak
Hi,
Can you clean up the code lil bit better, it's hard to read what's going
on. You can use pastebin or gist to put the code.

On Wed, Dec 17, 2014 at 3:58 PM, Hao Ren  wrote:
>
> Hi,
>
> I am using SparkSQL on 1.2.1 branch. The problem comes froms the following
> 4-line code:
>
> *val t1: SchemaRDD = hiveContext hql "select * from product where is_new =
> 0"
> val tb1: SchemaRDD = t1.sample(withReplacement = false, fraction = 0.05)
> tb1.registerTempTable("t1_tmp")
> (hiveContext sql "select count(*) from t1_tmp where is_new = 1") collect
> foreach println*
>
> We know that *t1* contains only rows whose "is_new" field is zero.
> After sampling t1 by taking 5% rows, normally, the sampled table should
> always contains only rows where "is_new" = 0. However, line 4 gives a
> number
> about 5 by chance. That means there are some rows where "is_new = 1" in the
> sampled table, which is not logically possible.
>
> I am not sure SchemaRDD.sample is doing his work well.
>
> Any idea ?
>
> Hao
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/SchemaRDD-sample-problem-tp20741.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

-- 
Regards,
Madhukara Phatak
http://www.madhukaraphatak.com


Re: When will spark 1.2 released?

2014-12-18 Thread madhu phatak
It’s on Maven Central already http://search.maven.org/#browse%7C717101892

On Fri, Dec 19, 2014 at 11:17 AM, vboylin1...@gmail.com <
vboylin1...@gmail.com> wrote:
>
> Hi,
>Dose any know when will spark 1.2 released? 1.2 has many great feature
> that we can't wait now ,-)
>
> Sincely
> Lin wukang
>
>
> 发自网易邮箱大师
>


-- 
Regards,
Madhukara Phatak
http://www.madhukaraphatak.com


Re: When will spark 1.2 released?

2014-12-18 Thread Ted Yu
Interesting, the maven artifacts were dated Dec 10th. 
However vote for RC2 closed recently:
http://search-hadoop.com/m/JW1q5K8onk2/Patrick+spark+1.2.0&subj=Re+VOTE+Release+Apache+Spark+1+2+0+RC2+

Cheers

On Dec 18, 2014, at 10:02 PM, madhu phatak  wrote:

> It’s on Maven Central already http://search.maven.org/#browse%7C717101892
> 
> On Fri, Dec 19, 2014 at 11:17 AM, vboylin1...@gmail.com 
>  wrote:
>> 
>> Hi, 
>>Dose any know when will spark 1.2 released? 1.2 has many great feature 
>> that we can't wait now ,-)
>> 
>> Sincely
>> Lin wukang
>> 
>> 
>> 发自网易邮箱大师
> 
> 
> -- 
> Regards,
> Madhukara Phatak
> http://www.madhukaraphatak.com


Re: When will spark 1.2 released?

2014-12-18 Thread Andrew Ash
Patrick is working on the release as we speak -- I expect it'll be out
later tonight (US west coast) or tomorrow at the latest.

On Fri, Dec 19, 2014 at 1:09 AM, Ted Yu  wrote:
>
> Interesting, the maven artifacts were dated Dec 10th.
> However vote for RC2 closed recently:
>
> http://search-hadoop.com/m/JW1q5K8onk2/Patrick+spark+1.2.0&subj=Re+VOTE+Release+Apache+Spark+1+2+0+RC2+
>
> Cheers
>
> On Dec 18, 2014, at 10:02 PM, madhu phatak  wrote:
>
> It’s on Maven Central already http://search.maven.org/#browse%7C717101892
>
> On Fri, Dec 19, 2014 at 11:17 AM, vboylin1...@gmail.com <
> vboylin1...@gmail.com> wrote:
>>
>> Hi,
>>Dose any know when will spark 1.2 released? 1.2 has many great feature
>> that we can't wait now ,-)
>>
>> Sincely
>> Lin wukang
>>
>>
>> 发自网易邮箱大师
>>
>
>
> --
> Regards,
> Madhukara Phatak
> http://www.madhukaraphatak.com
>
>


Re: When will spark 1.2 released?

2014-12-18 Thread Matei Zaharia
Yup, as he posted before, "An Apache infrastructure issue prevented me from 
pushing this last night. The issue was resolved today and I should be able to 
push the final release artifacts tonight."

> On Dec 18, 2014, at 10:14 PM, Andrew Ash  wrote:
> 
> Patrick is working on the release as we speak -- I expect it'll be out later 
> tonight (US west coast) or tomorrow at the latest.
> 
> On Fri, Dec 19, 2014 at 1:09 AM, Ted Yu  > wrote:
> Interesting, the maven artifacts were dated Dec 10th. 
> However vote for RC2 closed recently:
> http://search-hadoop.com/m/JW1q5K8onk2/Patrick+spark+1.2.0&subj=Re+VOTE+Release+Apache+Spark+1+2+0+RC2+
>  
> 
> 
> Cheers
> 
> On Dec 18, 2014, at 10:02 PM, madhu phatak  > wrote:
> 
>> It’s on Maven Central already http://search.maven.org/#browse%7C717101892 
>> 
>> 
>> On Fri, Dec 19, 2014 at 11:17 AM, vboylin1...@gmail.com 
>>  > > wrote:
>> Hi, 
>>Dose any know when will spark 1.2 released? 1.2 has many great feature 
>> that we can't wait now ,-)
>> 
>> Sincely
>> Lin wukang
>> 
>> 
>> 发自网易邮箱大师
>> 
>> 
>> -- 
>> Regards,
>> Madhukara Phatak
>> http://www.madhukaraphatak.com