RE: Spark cluster set up on EC2 customization

2015-02-26 Thread Sameer Tilak
Thanks!

Date: Thu, 26 Feb 2015 12:51:21 +0530
Subject: Re: Spark cluster set up on EC2 customization
From: ak...@sigmoidanalytics.com
To: ssti...@live.com
CC: user@spark.apache.org

You can easily add a function (say setup_pig) inside the function setup_cluster 
in this scriptThanksBest Regards

On Thu, Feb 26, 2015 at 7:08 AM, Sameer Tilak  wrote:






Hi,
I was looking at the documentation for deploying Spark cluster on EC2. 
http://spark.apache.org/docs/latest/ec2-scripts.html
We are using Pig to build the data pipeline and then use MLLib for analytics. I 
was wondering if someone has any experience to include additional 
tools/services such as Pig/Hadoop in the above deployment script?  


  

  

Re: Re: Many Receiver vs. Many threads per Receiver

2015-02-26 Thread bit1...@163.com
Sure, Thanks Tathagata! 



bit1...@163.com
 
From: Tathagata Das
Date: 2015-02-26 14:47
To: bit1...@163.com
CC: Akhil Das; user
Subject: Re: Re: Many Receiver vs. Many threads per Receiver
Spark Streaming has a new Kafka direct stream, to be release as experimental 
feature with 1.3. That uses a low level consumer. Not sure if it satisfies your 
purpose. 
If you want more control, its best to create your own Receiver with the low 
level Kafka API. 

TD

On Tue, Feb 24, 2015 at 12:09 AM, bit1...@163.com  wrote:
Thanks Akhil.
Not sure whether thelowlevel consumer.will be officially supported by Spark 
Streaming. So far, I don't see it mentioned/documented in the spark streaming 
programming guide.



bit1...@163.com
 
From: Akhil Das
Date: 2015-02-24 16:21
To: bit1...@163.com
CC: user
Subject: Re: Many Receiver vs. Many threads per Receiver
I believe when you go with 1, it will distribute the consumer across your 
cluster (possibly on 6 machines), but still it i don't see a away to tell from 
which partition it will consume etc. If you are looking to have a consumer 
where you can specify the partition details and all, then you are better off 
with the lowlevel consumer.



Thanks
Best Regards

On Tue, Feb 24, 2015 at 9:36 AM, bit1...@163.com  wrote:
Hi,
I  am experimenting Spark Streaming and Kafka Integration, To read messages 
from Kafka in parallel, basically there are two ways
1. Create many Receivers like (1 to 6).map(_ => KakfaUtils.createStream). 
2. Specifiy many threads when calling KakfaUtils.createStream like val 
topicMap("myTopic"=>6), this will create one receiver with 6 reading threads.

My question is which option is better, sounds option 2 is better is to me 
because it saves a lot of cores(one Receiver one core), but I learned from 
somewhere else that choice 1 is better, so I would ask and see how you guys 
elaborate on this. Thank



bit1...@163.com




Re: Re: Many Receiver vs. Many threads per Receiver

2015-02-26 Thread Jeffrey Jedele
As I understand the matter:

Option 1) has benefits when you think that your network bandwidth may be a
bottle neck, because Spark opens several network connections on possibly
several different physical machines.

Option 2) - as you already pointed out - has the benefit that you occupy
less worker cores with receiver tasks.

Regards,
Jeff

2015-02-26 9:38 GMT+01:00 bit1...@163.com :

> Sure, Thanks Tathagata!
>
> --
> bit1...@163.com
>
>
> *From:* Tathagata Das 
> *Date:* 2015-02-26 14:47
> *To:* bit1...@163.com
> *CC:* Akhil Das ; user 
> *Subject:* Re: Re: Many Receiver vs. Many threads per Receiver
> Spark Streaming has a new Kafka direct stream, to be release as
> experimental feature with 1.3. That uses a low level consumer. Not sure if
> it satisfies your purpose.
> If you want more control, its best to create your own Receiver with the
> low level Kafka API.
>
> TD
>
> On Tue, Feb 24, 2015 at 12:09 AM, bit1...@163.com  wrote:
>
>> Thanks Akhil.
>> Not sure whether thelowlevel consumer.
>> will be officially
>> supported by Spark Streaming. So far, I don't see it mentioned/documented
>> in the spark streaming programming guide.
>>
>> --
>> bit1...@163.com
>>
>>
>> *From:* Akhil Das 
>> *Date:* 2015-02-24 16:21
>> *To:* bit1...@163.com
>> *CC:* user 
>> *Subject:* Re: Many Receiver vs. Many threads per Receiver
>> I believe when you go with 1, it will distribute the consumer across your
>> cluster (possibly on 6 machines), but still it i don't see a away to tell
>> from which partition it will consume etc. If you are looking to have a
>> consumer where you can specify the partition details and all, then you are
>> better off with the lowlevel consumer.
>> 
>>
>>
>>
>> Thanks
>> Best Regards
>>
>> On Tue, Feb 24, 2015 at 9:36 AM, bit1...@163.com  wrote:
>>
>>> Hi,
>>> I  am experimenting Spark Streaming and Kafka Integration, To read
>>> messages from Kafka in parallel, basically there are two ways
>>> 1. Create many Receivers like (1 to 6).map(_ =>
>>> KakfaUtils.createStream).
>>> 2. Specifiy many threads when calling KakfaUtils.createStream like val
>>> topicMap("myTopic"=>6), this will create one receiver with 6 reading
>>> threads.
>>>
>>> My question is which option is better, sounds option 2 is better is to
>>> me because it saves a lot of cores(one Receiver one core), but I
>>> learned from somewhere else that choice 1 is better, so I would ask and see
>>> how you guys elaborate on this. Thank
>>>
>>> --
>>> bit1...@163.com
>>>
>>
>>
>


Re: Facing error while extending scala class with Product interface to overcome limit of 22 fields in spark-shell

2015-02-26 Thread Patrick Varilly
Hi, Akhil,

In your definition of sdp_d
,
all your fields are of type Option[X].  In Scala, a value of type Option[X]
can hold one of two things:

1. None
2. Some(x), where x is of type X

So to fix your immediate problem, wrap all your parameters to the sdp_d
constructor in Some(...), as follows:

   new sdp_d(Some(r(0).trim.toInt), Some(r(1).trim.toInt), Some(r(2).trim),
...

Your earlier question of why writing sdp_d(...) for a case class works but
you need to write new sdp_d(...) for an explicit class, there's a simple
answer.  When you create a case class X in scala, Scala also makes a
companion object X behind the scenes with an apply method that calls new
(see below).  Scala's rules will call this apply method automatically.  So,
when you write "X(...)", you're really calling "X.apply(...)" which in turn
calls "new X(...)".  (This is the same trick behind writing things like
List(1,2,3))  If you don't use a case class, you'd have to make the
companion object yourself explicitly.

For reference, this statement:

   case class X(a: A, b: B)

is conceptually equivalent to

   class X(val a: A, val b: B) extends ... {

  override def toString: String = // Auto-generated
  override def hashCode: Int = // Auto-generated
  override def equals(that: Any): Boolean = // Auto-generated

  ... more convenience methods ...
   }

   object X {
  def apply(a: A, b: B) = new X(a, b)
  ... more convenience methods ...
   }

If you want to peek under the hood, try compiling a simple X.scala file
with the line "case class X(a: Int, b: Double)", then taking apart the
generated X.class and X$.class (e.g., "javap X.class").

More info here ,
here  and in
Programming
in Scala  ch 15.

Hope that helps!

Best,

Patrick

On Thu, Feb 26, 2015 at 6:37 AM, anamika gupta 
wrote:

> I am now getting the following error. I cross-checked my types and
> corrected three of them i.e. r26-->String, r27-->Timestamp,
> r28-->Timestamp. This error still persists.
>
> scala>
> sc.textFile("/home/cdhuser/Desktop/Sdp_d.csv").map(_.split(",")).map { r =>
>  | val upto_time = sdf.parse(r(23).trim);
>  | calendar.setTime(upto_time);
>  | val r23 = new java.sql.Timestamp(upto_time.getTime)
>  | val insert_time = sdf.parse(r(27).trim)
>  | calendar.setTime(insert_time)
>  | val r27 = new java.sql.Timestamp(insert_time.getTime)
>  | val last_upd_time = sdf.parse(r(28).trim)
>  | calendar.setTime(last_upd_time)
>  | val r28 = new java.sql.Timestamp(last_upd_time.getTime)
>  | new sdp_d(r(0).trim.toInt, r(1).trim.toInt, r(2).trim,
> r(3).trim.toInt, r(4).trim.toInt, r(5).trim, r(6).trim.toInt, r(7).trim,
> r(8).trim.toDouble, r(9).trim.toDouble, r(10).trim, r(11).trim, r(12).trim,
> r(13).trim, r(14).trim, r(15).trim, r(16).trim, r(17).trim, r(18).trim,
> r(19).trim, r(20).trim, r(21).trim.toInt, r(22).trim, r23, r(24).trim,
> r(25).trim, r(26).trim, r27, r28)
>  | }.registerAsTable("sdp_d")
>
> :26: error: type mismatch;
>  found   : Int
>  required: Option[Int]
>   new sdp_d(r(0).trim.toInt, r(1).trim.toInt, r(2).trim,
> r(3).trim.toInt, r(4).trim.toInt, r(5).trim, r(6).trim.toInt, r(7).trim,
> r(8).trim.toDouble, r(9).trim.toDouble, r(10).trim, r(11).trim, r(12).trim,
> r(13).trim, r(14).trim, r(15).trim, r(16).trim, r(17).trim, r(18).trim,
> r(19).trim, r(20).trim, r(21).trim.toInt, r(22).trim, r23, r(24).trim,
> r(25).trim, r(26).trim, r27, r28)
>
> On Wed, Feb 25, 2015 at 2:32 PM, Akhil Das 
> wrote:
>
>> It says sdp_d not found, since it is a class you need to instantiate it
>> once. like:
>>
>> sc.textFile("derby.log").map(_.split(",")).map( r => {
>>   val upto_time = sdf.parse(r(23).trim);
>>   calendar.setTime(upto_time);
>>   val r23 = new java.sql.Timestamp(upto_time.getTime);
>>
>>   val insert_time = sdf.parse(r(26).trim);
>>   calendar.setTime(insert_time);
>>   val r26 = new java.sql.Timestamp(insert_time.getTime);
>>
>>   val last_upd_time = sdf.parse(r(27).trim);
>>   calendar.setTime(last_upd_time);
>>   val r27 = new java.sql.Timestamp(last_upd_time.getTime);
>>
>>   *new* *sdp_d(r(0).trim.toInt, r(1).trim.toInt, r(2).trim,
>> r(3).trim.toInt, r(4).trim.toInt, r(5).trim, r(6).trim.toInt, r(7).trim,
>> r(8).trim.toDouble, r(9).trim.toDouble, r(10).trim, r(11).trim, r(12).trim,
>> r(13).trim, r(14).trim, r(15).trim, r(16).trim, r(17).trim, r(18).trim,
>> r(19).trim, r(20).trim, r(21).trim.toInt, r(22).trim, r23, r(24).trim,
>> r(25).trim, r26, r27, r(28).trim)*
>>   }).registerAsTable("sdp")
>>
>> Thanks
>> Best Regards
>>
>> On Wed, Feb 25, 2015 at 2:14 PM, anamika gupta 
>

Issue with deploye Driver in cluster mode

2015-02-26 Thread pankaj
Hi,

I have 3 node spark cluster

node1 , node2 and node 3

I running below command on node 1 for deploying driver

/usr/local/spark-1.2.1-bin-hadoop2.4/bin/spark-submit --class
com.fst.firststep.aggregator.FirstStepMessageProcessor --master
spark://ec2-xx-xx-xx-xx.compute-1.amazonaws.com:7077 --deploy-mode cluster
--supervise file:///home/xyz/sparkstreaming-0.0.1-SNAPSHOT.jar
/home/xyz/config.properties

driver gets launched on node 2 in cluster. but getting exception on node 2
that it is trying to bind to node 1 ip.


2015-02-26 08:47:32 DEBUG AkkaUtils:63 - In createActorSystem, requireCookie
is: off
2015-02-26 08:47:32 INFO  Slf4jLogger:80 - Slf4jLogger started
2015-02-26 08:47:33 ERROR NettyTransport:65 - failed to bind to
ec2-xx.xx.xx.xx.compute-1.amazonaws.com/xx.xx.xx.xx:0, shutting down Netty
transport
2015-02-26 08:47:33 WARN  Utils:71 - Service 'Driver' could not bind on port
0. Attempting port 1.
2015-02-26 08:47:33 DEBUG AkkaUtils:63 - In createActorSystem, requireCookie
is: off
2015-02-26 08:47:33 ERROR Remoting:65 - Remoting error: [Startup failed] [
akka.remote.RemoteTransportException: Startup failed
at
akka.remote.Remoting.akka$remote$Remoting$$notifyError(Remoting.scala:136)
at akka.remote.Remoting.start(Remoting.scala:201)
at
akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184)
at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:618)
at
akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:615)
at akka.actor.ActorSystemImpl._start(ActorSystem.scala:615)
at akka.actor.ActorSystemImpl.start(ActorSystem.scala:632)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:141)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:118)
at
org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121)
at
org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54)
at
org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53)
at
org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1765)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1756)
at
org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:56)
at
org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:33)
at
org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
Caused by: org.jboss.netty.channel.ChannelException: Failed to bind to:
ec2-xx-xx-xx.compute-1.amazonaws.com/xx.xx.xx.xx:0
at
org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272)
at
akka.remote.transport.netty.NettyTransport$$anonfun$listen$1.apply(NettyTransport.scala:393)
at
akka.remote.transport.netty.NettyTransport$$anonfun$listen$1.apply(NettyTransport.scala:389)
at scala.util.Success$$anonfun$map$1.apply(Try.scala:206)
at scala.util.Try$.apply(Try.scala:161)
at scala.util.Success.map(Try.scala:206)
 

kindly suggest

Thanks






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Issue-with-deploye-Driver-in-cluster-mode-tp21821.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark-SQL 1.2.0 "sort by" results are not consistent with Hive

2015-02-26 Thread Cheng Lian
Could you check the Spark web UI for the number of tasks issued when the 
query is executed? I digged out |mapred.map.tasks| because I saw 2 tasks 
were issued.


On 2/26/15 3:01 AM, Kannan Rajah wrote:

Cheng, We tried this setting and it still did not help. This was on 
Spark 1.2.0.



--
Kannan

On Mon, Feb 23, 2015 at 6:38 PM, Cheng Lian > wrote:


(Move to user list.)

Hi Kannan,

You need to set |mapred.map.tasks| to 1 in hive-site.xml. The
reason is this line of code

,
which overrides |spark.default.parallelism|. Also,
|spark.sql.shuffle.parallelism| isn’t used here since there’s no
shuffle involved (we only need to sort within a partition).

Default value of |mapred.map.tasks| is 2
. You
may see that the Spark SQL result can be divided into two sorted
parts from the middle.

Cheng

On 2/19/15 10:33 AM, Kannan Rajah wrote:


According to hive documentation, "sort by" is supposed to order the results
for each reducer. So if we set a single reducer, then the results should be
sorted, right? But this is not happening. Any idea why? Looks like the
settings I am using to restrict the number of reducers is not having an
effect.

*Tried the following:*

Set spark.default.parallelism to 1

Set spark.sql.shuffle.partitions to 1

These were set in hive-site.xml and also inside spark shell.


*Spark-SQL*

create table if not exists testSortBy (key int, name string, age int);
LOAD DATA LOCAL INPATH '/home/mapr/sample-name-age.txt' OVERWRITE INTO TABLE
testSortBy;
select * from testSortBY;

1Aditya28
2aash25
3prashanth27
4bharath26
5terry27
6nanda26
7pradeep27
8pratyay26


set spark.default.parallelism=1;

set spark.sql.shuffle.partitions=1;

select name,age from testSortBy sort by age; aash 25 bharath 26 prashanth
27 Aditya 28 nanda 26 pratyay 26 terry 27 pradeep 27 *HIVE* select name,age
from testSortBy sort by age;

aash25
bharath26
nanda26
pratyay26
prashanth27
terry27
pradeep27
Aditya28


--
Kannan


​



​


GroupByKey causing problem

2015-02-26 Thread Tushar Sharma
Hi,

I am trying to apply binning to a large CSV dataset. Here are the steps I
am taking:

1. Emit each value of CSV as (ColIndex,(RowIndex,value))
2. Then I groupByKey (here ColumnIndex) and get all values of a particular
index to one node, as I have to work on the collection of all values
3. I apply my binning algorithm which is as follows:
a. Sort the values
b. Iterate through values and see if it is different than the previous
one
if no then add it to the same bin
if yes then check the size of that bin, if it is greater than a
particular size (say 5% of wholedataset) then change the bin
number, else keep the same bin
c. repeat for each column

Due to this algorithm I can't calculate it partition wise and merge for
final result. But even for groupByKey I expect it should work , maybe
slowly, but it should finish. I increased the partition to reduce the
output of each groupByKey so that it helps in successful completion of the
process. But even with that it is stuck at the same stage. The log for
executor says:

ExternalMapAppendOnly(splilling to disk) (Trying ...)

The code works for small CSV files but can't complete for big files.

val inputfile = "hdfs://hm41:9000/user/file1"
val table = sc.textFile(inputfile,1000)

val withoutHeader: RDD[String] = dropHeader(table)

val kvPairs = withoutHeader.flatMap(retAtrTuple)

//val filter_na = kvPairs.map{case (x,y) => (x,if(y == "NA") "" else y)}

val isNum = kvPairs.map{case (x,y) => (x,isNumeric(y))}.reduceByKey(_&&_)

val numeric_indexes = isNum.filter{case (x,y) => y}.sortByKey().map{case
(x,y) => x}.collect()
//val isNum_Arr = isNum.sortByKey().collect()

val kvidx = withoutHeader.zipWithIndex
//val t = kvidx.map{case (a,b) => retAtrTuple(a).map(x =>(x,b)) }


val t = kvidx.flatMap{case (a,b) => retAtrTuple(a).map(x =>(x,b)) }
val t2 = t.filter{case (a,b) => numeric_indexes contains a._1 }

//val t2 = t.filter{case (a,b) => a._1 ==0 }
val t3 = t2.map{case ((a,b),c) => (a,(c,b.toDouble))}
//val t4 = t3.sortBy(_._2._1)
val t4 = t3.groupByKey.map{case (a,b) =>
(a,classing_summary(b.toArray.sortBy(_._2)))}

def dropHeader(data: RDD[String]): RDD[String] = {
data.mapPartitionsWithIndex((idx, lines) => {
  if (idx == 0) {
lines.drop(1)
  }
  lines
})
  }


  def retAtrTuple(x: String) = {
val newX = x.split(',')
for (h <- 0 until newX.length)
  yield (h, newX(h))
  }

def isNumeric(s: String): Boolean = {
(allCatch opt s.toDouble).isDefined
  }

def classing_summary(arr: Array[(Long, Double)]) = {
  var idx = 0L
  var value = 0.0
  var prevValue = Double.MinValue
  var counter = 1
  var classSize = 0.0
  var size = arr.length

  val output = for(i <- 0 until arr.length) yield {
  idx = arr(i)._1;
  value = arr(i)._2;
  if(value==prevValue){
classSize+=1.0/size;
//println("both values same")
//println(idx,value,classSize,counter,classSize);
prevValue = value;
(idx,value,counter,classSize);
  }
  else if(classSize<(0.05)){
classSize+=1.0/size;
//println("both values not same, adding to present bucket")
//println(idx,value,classSize,counter,classSize);
prevValue = value;
(idx,value,counter,classSize);
  }
  else {
classSize = 1.0/size;
counter +=1;
//println("both values not same, adding to different bucket")
//println(idx,value,classSize,counter,classSize);
prevValue = value;
(idx,value,counter,classSize);
  }
  }
  output.toArray
}

Thanks in advance,

Tushar Sharma


Re: Executor lost with too many temp files

2015-02-26 Thread Marius Soutier
Yeah did that already (65k). We also disabled swapping and reduced the amount 
of memory allocated to Spark (available - 4). This seems to have resolved the 
situation.

Thanks!

> On 26.02.2015, at 05:43, Raghavendra Pandey  
> wrote:
> 
> Can you try increasing the ulimit -n on your machine.
> 
> On Mon, Feb 23, 2015 at 10:55 PM, Marius Soutier  > wrote:
> Hi Sameer,
> 
> I’m still using Spark 1.1.1, I think the default is hash shuffle. No external 
> shuffle service.
> 
> We are processing gzipped JSON files, the partitions are the amount of input 
> files. In my current data set we have ~850 files that amount to 60 GB (so 
> ~600 GB uncompressed). We have 5 workers with 8 cores and 48 GB RAM each. We 
> extract five different groups of data from this to filter, clean and 
> denormalize (i.e. join) it for easier downstream processing.
> 
> By the way this code does not seem to complete at all without using 
> coalesce() at a low number, 5 or 10 work great. Everything above that make it 
> very likely it will crash, even on smaller datasets (~300 files). But I’m not 
> sure if this is related to the above issue.
> 
> 
>> On 23.02.2015, at 18:15, Sameer Farooqui > > wrote:
>> 
>> Hi Marius,
>> 
>> Are you using the sort or hash shuffle?
>> 
>> Also, do you have the external shuffle service enabled (so that the Worker 
>> JVM or NodeManager can still serve the map spill files after an Executor 
>> crashes)?
>> 
>> How many partitions are in your RDDs before and after the problematic 
>> shuffle operation?
>> 
>> 
>> 
>> On Monday, February 23, 2015, Marius Soutier > > wrote:
>> Hi guys,
>> 
>> I keep running into a strange problem where my jobs start to fail with the 
>> dreaded "Resubmitted (resubmitted due to lost executor)” because of having 
>> too many temp files from previous runs.
>> 
>> Both /var/run and /spill have enough disk space left, but after a given 
>> amount of jobs have run, following jobs will struggle with completion. There 
>> are a lot of failures without any exception message, only the above 
>> mentioned lost executor. As soon as I clear out /var/run/spark/work/ and the 
>> spill disk, everything goes back to normal.
>> 
>> Thanks for any hint,
>> - Marius
>> 
>> 
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org <>
>> For additional commands, e-mail: user-h...@spark.apache.org <>
>> 
> 
> 



Re: Scheduler hang?

2015-02-26 Thread Victor Tso-Guillen
The data is small. The job is composed of many small stages.

* I found that with fewer than 222 the problem exhibits. What will be
gained by going higher?
* Pushing up the parallelism only pushes up the boundary at which the
system appears to hang. I'm worried about some sort of message loss or
inconsistency.
* Yes, we are using Kryo.
* I'll try that, but I'm again a little confused why you're recommending
this. I'm stumped so might as well?

On Wed, Feb 25, 2015 at 11:13 PM, Akhil Das 
wrote:

> What operation are you trying to do and how big is the data that you are
> operating on?
>
> Here's a few things which you can try:
>
> - Repartition the RDD to a higher number than 222
> - Specify the master as local[*] or local[10]
> - Use Kryo Serializer (.set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer"))
> - Enable RDD Compression (.set("spark.rdd.compress","true") )
>
>
> Thanks
> Best Regards
>
> On Thu, Feb 26, 2015 at 10:15 AM, Victor Tso-Guillen 
> wrote:
>
>> I'm getting this really reliably on Spark 1.2.1. Basically I'm in local
>> mode with parallelism at 8. I have 222 tasks and I never seem to get far
>> past 40. Usually in the 20s to 30s it will just hang. The last logging is
>> below, and a screenshot of the UI.
>>
>> 2015-02-25 20:39:55.779 GMT-0800 INFO  [task-result-getter-3]
>> TaskSetManager - Finished task 3.0 in stage 16.0 (TID 22) in 612 ms on
>> localhost (1/5)
>> 2015-02-25 20:39:55.825 GMT-0800 INFO  [Executor task launch worker-10]
>> Executor - Finished task 1.0 in stage 16.0 (TID 20). 2492 bytes result sent
>> to driver
>> 2015-02-25 20:39:55.825 GMT-0800 INFO  [Executor task launch worker-8]
>> Executor - Finished task 2.0 in stage 16.0 (TID 21). 2492 bytes result sent
>> to driver
>> 2015-02-25 20:39:55.831 GMT-0800 INFO  [task-result-getter-0]
>> TaskSetManager - Finished task 1.0 in stage 16.0 (TID 20) in 670 ms on
>> localhost (2/5)
>> 2015-02-25 20:39:55.836 GMT-0800 INFO  [task-result-getter-1]
>> TaskSetManager - Finished task 2.0 in stage 16.0 (TID 21) in 674 ms on
>> localhost (3/5)
>> 2015-02-25 20:39:55.891 GMT-0800 INFO  [Executor task launch worker-9]
>> Executor - Finished task 0.0 in stage 16.0 (TID 19). 2492 bytes result sent
>> to driver
>> 2015-02-25 20:39:55.896 GMT-0800 INFO  [task-result-getter-2]
>> TaskSetManager - Finished task 0.0 in stage 16.0 (TID 19) in 740 ms on
>> localhost (4/5)
>>
>> [image: Inline image 1]
>> What should I make of this? Where do I start?
>>
>> Thanks,
>> Victor
>>
>
>


Re: Scheduler hang?

2015-02-26 Thread Victor Tso-Guillen
Is there any potential problem from 1.1.1 to 1.2.1 with shuffle
dependencies that produce no data?

On Thu, Feb 26, 2015 at 1:56 AM, Victor Tso-Guillen  wrote:

> The data is small. The job is composed of many small stages.
>
> * I found that with fewer than 222 the problem exhibits. What will be
> gained by going higher?
> * Pushing up the parallelism only pushes up the boundary at which the
> system appears to hang. I'm worried about some sort of message loss or
> inconsistency.
> * Yes, we are using Kryo.
> * I'll try that, but I'm again a little confused why you're recommending
> this. I'm stumped so might as well?
>
> On Wed, Feb 25, 2015 at 11:13 PM, Akhil Das 
> wrote:
>
>> What operation are you trying to do and how big is the data that you are
>> operating on?
>>
>> Here's a few things which you can try:
>>
>> - Repartition the RDD to a higher number than 222
>> - Specify the master as local[*] or local[10]
>> - Use Kryo Serializer (.set("spark.serializer",
>> "org.apache.spark.serializer.KryoSerializer"))
>> - Enable RDD Compression (.set("spark.rdd.compress","true") )
>>
>>
>> Thanks
>> Best Regards
>>
>> On Thu, Feb 26, 2015 at 10:15 AM, Victor Tso-Guillen 
>> wrote:
>>
>>> I'm getting this really reliably on Spark 1.2.1. Basically I'm in local
>>> mode with parallelism at 8. I have 222 tasks and I never seem to get far
>>> past 40. Usually in the 20s to 30s it will just hang. The last logging is
>>> below, and a screenshot of the UI.
>>>
>>> 2015-02-25 20:39:55.779 GMT-0800 INFO  [task-result-getter-3]
>>> TaskSetManager - Finished task 3.0 in stage 16.0 (TID 22) in 612 ms on
>>> localhost (1/5)
>>> 2015-02-25 20:39:55.825 GMT-0800 INFO  [Executor task launch worker-10]
>>> Executor - Finished task 1.0 in stage 16.0 (TID 20). 2492 bytes result sent
>>> to driver
>>> 2015-02-25 20:39:55.825 GMT-0800 INFO  [Executor task launch worker-8]
>>> Executor - Finished task 2.0 in stage 16.0 (TID 21). 2492 bytes result sent
>>> to driver
>>> 2015-02-25 20:39:55.831 GMT-0800 INFO  [task-result-getter-0]
>>> TaskSetManager - Finished task 1.0 in stage 16.0 (TID 20) in 670 ms on
>>> localhost (2/5)
>>> 2015-02-25 20:39:55.836 GMT-0800 INFO  [task-result-getter-1]
>>> TaskSetManager - Finished task 2.0 in stage 16.0 (TID 21) in 674 ms on
>>> localhost (3/5)
>>> 2015-02-25 20:39:55.891 GMT-0800 INFO  [Executor task launch worker-9]
>>> Executor - Finished task 0.0 in stage 16.0 (TID 19). 2492 bytes result sent
>>> to driver
>>> 2015-02-25 20:39:55.896 GMT-0800 INFO  [task-result-getter-2]
>>> TaskSetManager - Finished task 0.0 in stage 16.0 (TID 19) in 740 ms on
>>> localhost (4/5)
>>>
>>> [image: Inline image 1]
>>> What should I make of this? Where do I start?
>>>
>>> Thanks,
>>> Victor
>>>
>>
>>
>


CollectAsMap, Broadcasting.

2015-02-26 Thread Guillermo Ortiz
I have a question,

If I execute this code,

val users = sc.textFile("/tmp/users.log").map(x => x.split(",")).map(
v => (v(0), v(1)))
val contacts = sc.textFile("/tmp/contacts.log").map(y =>
y.split(",")).map( v => (v(0), v(1)))
val usersMap = contacts.collectAsMap()
contacts.map(v => (v._1, (usersMap(v._1), v._2))).collect()

When I execute collectAsMap, where is data? in each Executor?? I guess
than each executor has data that it proccesed. The result is sent to
the driver, but I guess that each executor keeps its piece of
processed data.

I guess that it's more efficient that to use a join in this case
because there's not shuffle but If I save usersMap as a broadcast
variable, wouldn't it be less efficient because I'm sending data to
executors and don't need it?

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Scheduler hang?

2015-02-26 Thread Akhil Das
Not many that i know of, but i bumped into this one
https://issues.apache.org/jira/browse/SPARK-4516

Thanks
Best Regards

On Thu, Feb 26, 2015 at 3:26 PM, Victor Tso-Guillen  wrote:

> Is there any potential problem from 1.1.1 to 1.2.1 with shuffle
> dependencies that produce no data?
>
> On Thu, Feb 26, 2015 at 1:56 AM, Victor Tso-Guillen 
> wrote:
>
>> The data is small. The job is composed of many small stages.
>>
>> * I found that with fewer than 222 the problem exhibits. What will be
>> gained by going higher?
>> * Pushing up the parallelism only pushes up the boundary at which the
>> system appears to hang. I'm worried about some sort of message loss or
>> inconsistency.
>> * Yes, we are using Kryo.
>> * I'll try that, but I'm again a little confused why you're recommending
>> this. I'm stumped so might as well?
>>
>> On Wed, Feb 25, 2015 at 11:13 PM, Akhil Das 
>> wrote:
>>
>>> What operation are you trying to do and how big is the data that you are
>>> operating on?
>>>
>>> Here's a few things which you can try:
>>>
>>> - Repartition the RDD to a higher number than 222
>>> - Specify the master as local[*] or local[10]
>>> - Use Kryo Serializer (.set("spark.serializer",
>>> "org.apache.spark.serializer.KryoSerializer"))
>>> - Enable RDD Compression (.set("spark.rdd.compress","true") )
>>>
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Thu, Feb 26, 2015 at 10:15 AM, Victor Tso-Guillen 
>>> wrote:
>>>
 I'm getting this really reliably on Spark 1.2.1. Basically I'm in local
 mode with parallelism at 8. I have 222 tasks and I never seem to get far
 past 40. Usually in the 20s to 30s it will just hang. The last logging is
 below, and a screenshot of the UI.

 2015-02-25 20:39:55.779 GMT-0800 INFO  [task-result-getter-3]
 TaskSetManager - Finished task 3.0 in stage 16.0 (TID 22) in 612 ms on
 localhost (1/5)
 2015-02-25 20:39:55.825 GMT-0800 INFO  [Executor task launch worker-10]
 Executor - Finished task 1.0 in stage 16.0 (TID 20). 2492 bytes result sent
 to driver
 2015-02-25 20:39:55.825 GMT-0800 INFO  [Executor task launch worker-8]
 Executor - Finished task 2.0 in stage 16.0 (TID 21). 2492 bytes result sent
 to driver
 2015-02-25 20:39:55.831 GMT-0800 INFO  [task-result-getter-0]
 TaskSetManager - Finished task 1.0 in stage 16.0 (TID 20) in 670 ms on
 localhost (2/5)
 2015-02-25 20:39:55.836 GMT-0800 INFO  [task-result-getter-1]
 TaskSetManager - Finished task 2.0 in stage 16.0 (TID 21) in 674 ms on
 localhost (3/5)
 2015-02-25 20:39:55.891 GMT-0800 INFO  [Executor task launch worker-9]
 Executor - Finished task 0.0 in stage 16.0 (TID 19). 2492 bytes result sent
 to driver
 2015-02-25 20:39:55.896 GMT-0800 INFO  [task-result-getter-2]
 TaskSetManager - Finished task 0.0 in stage 16.0 (TID 19) in 740 ms on
 localhost (4/5)

 [image: Inline image 1]
 What should I make of this? Where do I start?

 Thanks,
 Victor

>>>
>>>
>>
>


Re: spark streaming: stderr does not roll

2015-02-26 Thread Jeffrey Jedele
So the summarize (I had a similar question):
Spark's log4j per default is configured to log to the console? Those
messages end up in the stderr files and the approach does not support
rolling?

If I configure log4j to log to files, how can I keep the folder structure?
Should I use relative paths and assume that those end up in the same
folders the stderr files do?

Regards,
Jeff

2015-02-25 9:35 GMT+01:00 Sean Owen :

> These settings don't control what happens to stderr, right? stderr is
> up to the process that invoked the driver to control. You may wish to
> configure log4j to log to files instead.
>
> On Wed, Nov 12, 2014 at 8:15 PM, Nguyen, Duc 
> wrote:
> > I've also tried setting the aforementioned properties using
> > System.setProperty() as well as on the command line while submitting the
> job
> > using --conf key=value. All to no success. When I go to the Spark UI and
> > click on that particular streaming job and then the "Environment" tab, I
> can
> > see the properties are correctly set. But regardless of what I've tried,
> the
> > "stderr" log file on the worker nodes does not roll and continues to
> > grow...leading to a crash of the cluster once it claims 100% of disk. Has
> > anyone else encountered this? Anyone?
> >
> >
> >
> > On Fri, Nov 7, 2014 at 3:35 PM, Nguyen, Duc 
> wrote:
> >>
> >> We are running spark streaming jobs (version 1.1.0). After a sufficient
> >> amount of time, the stderr file grows until the disk is full at 100% and
> >> crashes the cluster. I've read this
> >>
> >> https://github.com/apache/spark/pull/895
> >>
> >> and also read this
> >>
> >> http://spark.apache.org/docs/latest/configuration.html#spark-streaming
> >>
> >>
> >> So I've tried testing with this in an attempt to get the stderr log file
> >> to roll.
> >>
> >> sparkConf.set("spark.executor.logs.rolling.strategy", "size")
> >> .set("spark.executor.logs.rolling.size.maxBytes", "1024")
> >> .set("spark.executor.logs.rolling.maxRetainedFiles", "3")
> >>
> >>
> >> Yet it does not roll and continues to grow. Am I missing something
> >> obvious?
> >>
> >>
> >> thanks,
> >> Duc
> >>
> >
> >
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: spark standalone with multiple executors in one work node

2015-02-26 Thread Sean Owen
--num-executors is the total number of executors. In YARN there is not
quite the same notion of a Spark worker. Of course, one worker has an
executor for each running app, so yes, but you mean for one app? it's
possible, though not usual, to run multiple executors for one app on
one worker. This may be useful if your executor heap size is otherwise
getting huge.

On Thu, Feb 26, 2015 at 1:58 AM, Judy Nash
 wrote:
> Hello,
>
>
>
> Does spark standalone support running multiple executors in one worker node?
>
>
>
> It seems yarn has the parameter --num-executors  to set number of executors
> to deploy, but I do not find the equivalent parameter in spark standalone.
>
>
>
>
>
> Thanks,
>
> Judy

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark streaming: stderr does not roll

2015-02-26 Thread Sean Owen
I think that's up to you. You can make it log wherever you want, and
have some control over how log4j names the rolled log files by
configuring its file-based rolling appender.

On Thu, Feb 26, 2015 at 10:05 AM, Jeffrey Jedele
 wrote:
> So the summarize (I had a similar question):
> Spark's log4j per default is configured to log to the console? Those
> messages end up in the stderr files and the approach does not support
> rolling?
>
> If I configure log4j to log to files, how can I keep the folder structure?
> Should I use relative paths and assume that those end up in the same folders
> the stderr files do?
>
> Regards,
> Jeff
>
> 2015-02-25 9:35 GMT+01:00 Sean Owen :
>>
>> These settings don't control what happens to stderr, right? stderr is
>> up to the process that invoked the driver to control. You may wish to
>> configure log4j to log to files instead.
>>
>> On Wed, Nov 12, 2014 at 8:15 PM, Nguyen, Duc 
>> wrote:
>> > I've also tried setting the aforementioned properties using
>> > System.setProperty() as well as on the command line while submitting the
>> > job
>> > using --conf key=value. All to no success. When I go to the Spark UI and
>> > click on that particular streaming job and then the "Environment" tab, I
>> > can
>> > see the properties are correctly set. But regardless of what I've tried,
>> > the
>> > "stderr" log file on the worker nodes does not roll and continues to
>> > grow...leading to a crash of the cluster once it claims 100% of disk.
>> > Has
>> > anyone else encountered this? Anyone?
>> >
>> >
>> >
>> > On Fri, Nov 7, 2014 at 3:35 PM, Nguyen, Duc 
>> > wrote:
>> >>
>> >> We are running spark streaming jobs (version 1.1.0). After a sufficient
>> >> amount of time, the stderr file grows until the disk is full at 100%
>> >> and
>> >> crashes the cluster. I've read this
>> >>
>> >> https://github.com/apache/spark/pull/895
>> >>
>> >> and also read this
>> >>
>> >> http://spark.apache.org/docs/latest/configuration.html#spark-streaming
>> >>
>> >>
>> >> So I've tried testing with this in an attempt to get the stderr log
>> >> file
>> >> to roll.
>> >>
>> >> sparkConf.set("spark.executor.logs.rolling.strategy", "size")
>> >> .set("spark.executor.logs.rolling.size.maxBytes", "1024")
>> >> .set("spark.executor.logs.rolling.maxRetainedFiles", "3")
>> >>
>> >>
>> >> Yet it does not roll and continues to grow. Am I missing something
>> >> obvious?
>> >>
>> >>
>> >> thanks,
>> >> Duc
>> >>
>> >
>> >
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: CollectAsMap, Broadcasting.

2015-02-26 Thread Sean Owen
No, it exists only on the driver, not the executors. Executors don't
retain partitions unless they are supposed to be persisted.

Generally, broadcasting a small Map to accomplish a join 'manually' is
more efficient than a join, but you are right that this is mostly
because joins usually involve shuffles. If not, it's not as clear
which way is best. I suppose that if the Map is large-ish, it's safer
to not keep pulling it to the driver.

On Thu, Feb 26, 2015 at 10:00 AM, Guillermo Ortiz  wrote:
> I have a question,
>
> If I execute this code,
>
> val users = sc.textFile("/tmp/users.log").map(x => x.split(",")).map(
> v => (v(0), v(1)))
> val contacts = sc.textFile("/tmp/contacts.log").map(y =>
> y.split(",")).map( v => (v(0), v(1)))
> val usersMap = contacts.collectAsMap()
> contacts.map(v => (v._1, (usersMap(v._1), v._2))).collect()
>
> When I execute collectAsMap, where is data? in each Executor?? I guess
> than each executor has data that it proccesed. The result is sent to
> the driver, but I guess that each executor keeps its piece of
> processed data.
>
> I guess that it's more efficient that to use a join in this case
> because there's not shuffle but If I save usersMap as a broadcast
> variable, wouldn't it be less efficient because I'm sending data to
> executors and don't need it?
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: CollectAsMap, Broadcasting.

2015-02-26 Thread Guillermo Ortiz
So, on my example, when I execute:
val usersMap = contacts.collectAsMap() --> Map goes to the driver and
just lives there in the beginning.
contacts.map(v => (v._1, (usersMap(v._1), v._2))).collect

When I execute usersMap(v._1),
Does driver has to send to the executorX the "value" which it needs? I
guess I'm missing something.
How does the data transfer among usersMap(just in the driver) and
executors work?

On this case it looks like better to use broadcasting like:
val usersMap = contacts.collectAsMap()
val bc = sc.broadcast(usersMap)
contacts.map(v => (v._1, (bc.value(v._1), v._2))).collect()

2015-02-26 11:16 GMT+01:00 Sean Owen :
> No, it exists only on the driver, not the executors. Executors don't
> retain partitions unless they are supposed to be persisted.
>
> Generally, broadcasting a small Map to accomplish a join 'manually' is
> more efficient than a join, but you are right that this is mostly
> because joins usually involve shuffles. If not, it's not as clear
> which way is best. I suppose that if the Map is large-ish, it's safer
> to not keep pulling it to the driver.
>
> On Thu, Feb 26, 2015 at 10:00 AM, Guillermo Ortiz  
> wrote:
>> I have a question,
>>
>> If I execute this code,
>>
>> val users = sc.textFile("/tmp/users.log").map(x => x.split(",")).map(
>> v => (v(0), v(1)))
>> val contacts = sc.textFile("/tmp/contacts.log").map(y =>
>> y.split(",")).map( v => (v(0), v(1)))
>> val usersMap = contacts.collectAsMap()
>> contacts.map(v => (v._1, (usersMap(v._1), v._2))).collect()
>>
>> When I execute collectAsMap, where is data? in each Executor?? I guess
>> than each executor has data that it proccesed. The result is sent to
>> the driver, but I guess that each executor keeps its piece of
>> processed data.
>>
>> I guess that it's more efficient that to use a join in this case
>> because there's not shuffle but If I save usersMap as a broadcast
>> variable, wouldn't it be less efficient because I'm sending data to
>> executors and don't need it?
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: CollectAsMap, Broadcasting.

2015-02-26 Thread Sean Owen
No. That code is just Scala code executing on the driver. usersMap is
a local object. This bit has nothing to do with Spark.

Yes you would have to broadcast it to use it efficient in functions
(not on the driver).

On Thu, Feb 26, 2015 at 10:24 AM, Guillermo Ortiz  wrote:
> So, on my example, when I execute:
> val usersMap = contacts.collectAsMap() --> Map goes to the driver and
> just lives there in the beginning.
> contacts.map(v => (v._1, (usersMap(v._1), v._2))).collect
>
> When I execute usersMap(v._1),
> Does driver has to send to the executorX the "value" which it needs? I
> guess I'm missing something.
> How does the data transfer among usersMap(just in the driver) and
> executors work?
>
> On this case it looks like better to use broadcasting like:
> val usersMap = contacts.collectAsMap()
> val bc = sc.broadcast(usersMap)
> contacts.map(v => (v._1, (bc.value(v._1), v._2))).collect()
>
> 2015-02-26 11:16 GMT+01:00 Sean Owen :
>> No, it exists only on the driver, not the executors. Executors don't
>> retain partitions unless they are supposed to be persisted.
>>
>> Generally, broadcasting a small Map to accomplish a join 'manually' is
>> more efficient than a join, but you are right that this is mostly
>> because joins usually involve shuffles. If not, it's not as clear
>> which way is best. I suppose that if the Map is large-ish, it's safer
>> to not keep pulling it to the driver.
>>
>> On Thu, Feb 26, 2015 at 10:00 AM, Guillermo Ortiz  
>> wrote:
>>> I have a question,
>>>
>>> If I execute this code,
>>>
>>> val users = sc.textFile("/tmp/users.log").map(x => x.split(",")).map(
>>> v => (v(0), v(1)))
>>> val contacts = sc.textFile("/tmp/contacts.log").map(y =>
>>> y.split(",")).map( v => (v(0), v(1)))
>>> val usersMap = contacts.collectAsMap()
>>> contacts.map(v => (v._1, (usersMap(v._1), v._2))).collect()
>>>
>>> When I execute collectAsMap, where is data? in each Executor?? I guess
>>> than each executor has data that it proccesed. The result is sent to
>>> the driver, but I guess that each executor keeps its piece of
>>> processed data.
>>>
>>> I guess that it's more efficient that to use a join in this case
>>> because there's not shuffle but If I save usersMap as a broadcast
>>> variable, wouldn't it be less efficient because I'm sending data to
>>> executors and don't need it?
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Scheduler hang?

2015-02-26 Thread Victor Tso-Guillen
Thanks for the link. Unfortunately, I turned on rdd compression and nothing
changed. I tried moving netty -> nio and no change :(

On Thu, Feb 26, 2015 at 2:01 AM, Akhil Das 
wrote:

> Not many that i know of, but i bumped into this one
> https://issues.apache.org/jira/browse/SPARK-4516
>
> Thanks
> Best Regards
>
> On Thu, Feb 26, 2015 at 3:26 PM, Victor Tso-Guillen 
> wrote:
>
>> Is there any potential problem from 1.1.1 to 1.2.1 with shuffle
>> dependencies that produce no data?
>>
>> On Thu, Feb 26, 2015 at 1:56 AM, Victor Tso-Guillen 
>> wrote:
>>
>>> The data is small. The job is composed of many small stages.
>>>
>>> * I found that with fewer than 222 the problem exhibits. What will be
>>> gained by going higher?
>>> * Pushing up the parallelism only pushes up the boundary at which the
>>> system appears to hang. I'm worried about some sort of message loss or
>>> inconsistency.
>>> * Yes, we are using Kryo.
>>> * I'll try that, but I'm again a little confused why you're recommending
>>> this. I'm stumped so might as well?
>>>
>>> On Wed, Feb 25, 2015 at 11:13 PM, Akhil Das 
>>> wrote:
>>>
 What operation are you trying to do and how big is the data that you
 are operating on?

 Here's a few things which you can try:

 - Repartition the RDD to a higher number than 222
 - Specify the master as local[*] or local[10]
 - Use Kryo Serializer (.set("spark.serializer",
 "org.apache.spark.serializer.KryoSerializer"))
 - Enable RDD Compression (.set("spark.rdd.compress","true") )


 Thanks
 Best Regards

 On Thu, Feb 26, 2015 at 10:15 AM, Victor Tso-Guillen 
 wrote:

> I'm getting this really reliably on Spark 1.2.1. Basically I'm in
> local mode with parallelism at 8. I have 222 tasks and I never seem to get
> far past 40. Usually in the 20s to 30s it will just hang. The last logging
> is below, and a screenshot of the UI.
>
> 2015-02-25 20:39:55.779 GMT-0800 INFO  [task-result-getter-3]
> TaskSetManager - Finished task 3.0 in stage 16.0 (TID 22) in 612 ms on
> localhost (1/5)
> 2015-02-25 20:39:55.825 GMT-0800 INFO  [Executor task launch
> worker-10] Executor - Finished task 1.0 in stage 16.0 (TID 20). 2492 bytes
> result sent to driver
> 2015-02-25 20:39:55.825 GMT-0800 INFO  [Executor task launch worker-8]
> Executor - Finished task 2.0 in stage 16.0 (TID 21). 2492 bytes result 
> sent
> to driver
> 2015-02-25 20:39:55.831 GMT-0800 INFO  [task-result-getter-0]
> TaskSetManager - Finished task 1.0 in stage 16.0 (TID 20) in 670 ms on
> localhost (2/5)
> 2015-02-25 20:39:55.836 GMT-0800 INFO  [task-result-getter-1]
> TaskSetManager - Finished task 2.0 in stage 16.0 (TID 21) in 674 ms on
> localhost (3/5)
> 2015-02-25 20:39:55.891 GMT-0800 INFO  [Executor task launch worker-9]
> Executor - Finished task 0.0 in stage 16.0 (TID 19). 2492 bytes result 
> sent
> to driver
> 2015-02-25 20:39:55.896 GMT-0800 INFO  [task-result-getter-2]
> TaskSetManager - Finished task 0.0 in stage 16.0 (TID 19) in 740 ms on
> localhost (4/5)
>
> [image: Inline image 1]
> What should I make of this? Where do I start?
>
> Thanks,
> Victor
>


>>>
>>
>


How to read from hdfs using spark-shell in Intel hadoop?

2015-02-26 Thread MEETHU MATHEW
Hi,
I am not able to read from HDFS(Intel distribution hadoop,Hadoop version is 
1.0.3) from spark-shell(spark version is 1.2.1). I built spark using the 
commandmvn -Dhadoop.version=1.0.3 clean package and started  spark-shell and 
read a HDFS file using sc.textFile() and the exception is  
 WARN hdfs.DFSClient: Failed to connect to /10.88.6.133:50010, add to deadNodes 
and continuejava.net.SocketTimeoutException: 12 millis timeout while 
waiting for channel to be ready for read. ch : 
java.nio.channels.SocketChannel[connected local=/10.88.6.131:44264 
remote=/10.88.6.133:50010]

The same problem is asked in the this mail.
 RE: Spark is unable to read from HDFS
|   |
|   |   |   |   |   |
| RE: Spark is unable to read from HDFSHi,Thanks for the reply. I've tried the 
below.  |
|  |
| View on mail-archives.us.apache.org | Preview by Yahoo |
|  |
|   |

  As suggested in the above mail,"In addition to specifying 
HADOOP_VERSION=1.0.3 in the ./project/SparkBuild.scala file, you will need to 
specify the libraryDependencies and name "spark-core"  resolvers. Otherwise, 
sbt will fetch version 1.0.3 of hadoop-core from apache instead of Intel. You 
can set up your own local or remote repository that you specify" 
Now HADOOP_VERSION is deprecated and -Dhadoop.version should be used. Can 
anybody please elaborate on how to specify tat SBT should fetch hadoop-core 
from Intel which is in our internal repository?
Thanks & Regards,
Meethu M

Re: How to efficiently control concurrent Spark jobs

2015-02-26 Thread Jeffrey Jedele
So basically you have lots of small ML tasks you want to run concurrently?

With "I've used repartition and cache to store the sub-datasets on only one
machine" you mean that you reduced each RDD to have one partition only?

Maybe you want to give the fair scheduler a try to get more of your tasks
executing concurrently. Just an idea...

Regards,
Jeff

2015-02-25 12:06 GMT+01:00 Staffan :

> Hi,
> Is there a good way (recommended way) to control and run multiple Spark
> jobs
> within the same application? My application is like follows;
>
> 1) Run one Spark job on a 'ful' dataset, which then creates a few thousands
> of RDDs containing sub-datasets from the complete dataset. Each of the
> sub-datasets are independent from the others (the 'ful' dataset is simply a
> dump from a database containing several different types of records).
> 2) Run some filtration and manipulations on each of the RDD and finally do
> some ML on the data. (Each of the created RDD's from step 1) is completely
> independent so this should be run concurrently).
>
> I've implemented this by using Scala Futures and executing the Spark jobs
> in
> 2) from a separate thread for each RDD. This works and improves runtime
> compared to a naive for-loop over step 2). Scaling is however not as good
> as
> I would expect it to be. (28 minutes for 4 cores on 1 machine -> 19 minutes
> for 12 cores on 3 machines).
>
> Each of the sub-datasets are fairly small so I've used 'repartition' and
> 'cache' to store the sub-datasets on only one machine in step 1), this
> improved runtime a few %.
>
> So, either do anyone have a suggestion of how to do this in a better way or
> perhaps if there a higher level workflow tool that I can use on top of
> Spark? (The cool solution would have been to use nestled RDDs and just map
> over them in a high level way, but as this is not supported afaik).
>
> Thanks!
> Staffan
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-efficiently-control-concurrent-Spark-jobs-tp21800.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: CollectAsMap, Broadcasting.

2015-02-26 Thread Guillermo Ortiz
Isn't it "contacts.map(v => (v._1, (usersMap(v._1), v._2))).collect()"
executed in the executors?  why is it executed in the driver?
contacts are not a local object, right?


2015-02-26 11:27 GMT+01:00 Sean Owen :
> No. That code is just Scala code executing on the driver. usersMap is
> a local object. This bit has nothing to do with Spark.
>
> Yes you would have to broadcast it to use it efficient in functions
> (not on the driver).
>
> On Thu, Feb 26, 2015 at 10:24 AM, Guillermo Ortiz  
> wrote:
>> So, on my example, when I execute:
>> val usersMap = contacts.collectAsMap() --> Map goes to the driver and
>> just lives there in the beginning.
>> contacts.map(v => (v._1, (usersMap(v._1), v._2))).collect
>>
>> When I execute usersMap(v._1),
>> Does driver has to send to the executorX the "value" which it needs? I
>> guess I'm missing something.
>> How does the data transfer among usersMap(just in the driver) and
>> executors work?
>>
>> On this case it looks like better to use broadcasting like:
>> val usersMap = contacts.collectAsMap()
>> val bc = sc.broadcast(usersMap)
>> contacts.map(v => (v._1, (bc.value(v._1), v._2))).collect()
>>
>> 2015-02-26 11:16 GMT+01:00 Sean Owen :
>>> No, it exists only on the driver, not the executors. Executors don't
>>> retain partitions unless they are supposed to be persisted.
>>>
>>> Generally, broadcasting a small Map to accomplish a join 'manually' is
>>> more efficient than a join, but you are right that this is mostly
>>> because joins usually involve shuffles. If not, it's not as clear
>>> which way is best. I suppose that if the Map is large-ish, it's safer
>>> to not keep pulling it to the driver.
>>>
>>> On Thu, Feb 26, 2015 at 10:00 AM, Guillermo Ortiz  
>>> wrote:
 I have a question,

 If I execute this code,

 val users = sc.textFile("/tmp/users.log").map(x => x.split(",")).map(
 v => (v(0), v(1)))
 val contacts = sc.textFile("/tmp/contacts.log").map(y =>
 y.split(",")).map( v => (v(0), v(1)))
 val usersMap = contacts.collectAsMap()
 contacts.map(v => (v._1, (usersMap(v._1), v._2))).collect()

 When I execute collectAsMap, where is data? in each Executor?? I guess
 than each executor has data that it proccesed. The result is sent to
 the driver, but I guess that each executor keeps its piece of
 processed data.

 I guess that it's more efficient that to use a join in this case
 because there's not shuffle but If I save usersMap as a broadcast
 variable, wouldn't it be less efficient because I'm sending data to
 executors and don't need it?

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: CollectAsMap, Broadcasting.

2015-02-26 Thread Sean Owen
Yes, in that code, usersMap has been serialized to every executor.
I thought you were referring to accessing the copy in the driver.

On Thu, Feb 26, 2015 at 10:47 AM, Guillermo Ortiz  wrote:
> Isn't it "contacts.map(v => (v._1, (usersMap(v._1), v._2))).collect()"
> executed in the executors?  why is it executed in the driver?
> contacts are not a local object, right?

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Facing error while extending scala class with Product interface to overcome limit of 22 fields in spark-shell

2015-02-26 Thread anamika gupta
Hi Patrick

Thanks a ton for your in-depth answer. The compilation error is now
resolved.

Thanks a lot again !!

On Thu, Feb 26, 2015 at 2:40 PM, Patrick Varilly <
patrick.vari...@dataminded.be> wrote:

> Hi, Akhil,
>
> In your definition of sdp_d
> ,
> all your fields are of type Option[X].  In Scala, a value of type Option[X]
> can hold one of two things:
>
> 1. None
> 2. Some(x), where x is of type X
>
> So to fix your immediate problem, wrap all your parameters to the sdp_d
> constructor in Some(...), as follows:
>
>new sdp_d(Some(r(0).trim.toInt), Some(r(1).trim.toInt),
> Some(r(2).trim), ...
>
> Your earlier question of why writing sdp_d(...) for a case class works but
> you need to write new sdp_d(...) for an explicit class, there's a simple
> answer.  When you create a case class X in scala, Scala also makes a
> companion object X behind the scenes with an apply method that calls new
> (see below).  Scala's rules will call this apply method automatically.  So,
> when you write "X(...)", you're really calling "X.apply(...)" which in turn
> calls "new X(...)".  (This is the same trick behind writing things like
> List(1,2,3))  If you don't use a case class, you'd have to make the
> companion object yourself explicitly.
>
> For reference, this statement:
>
>case class X(a: A, b: B)
>
> is conceptually equivalent to
>
>class X(val a: A, val b: B) extends ... {
>
>   override def toString: String = // Auto-generated
>   override def hashCode: Int = // Auto-generated
>   override def equals(that: Any): Boolean = // Auto-generated
>
>   ... more convenience methods ...
>}
>
>object X {
>   def apply(a: A, b: B) = new X(a, b)
>   ... more convenience methods ...
>}
>
> If you want to peek under the hood, try compiling a simple X.scala file
> with the line "case class X(a: Int, b: Double)", then taking apart the
> generated X.class and X$.class (e.g., "javap X.class").
>
> More info here
> , here
>  and in Programming
> in Scala  ch 15.
>
> Hope that helps!
>
> Best,
>
> Patrick
>
> On Thu, Feb 26, 2015 at 6:37 AM, anamika gupta 
> wrote:
>
>> I am now getting the following error. I cross-checked my types and
>> corrected three of them i.e. r26-->String, r27-->Timestamp,
>> r28-->Timestamp. This error still persists.
>>
>> scala>
>> sc.textFile("/home/cdhuser/Desktop/Sdp_d.csv").map(_.split(",")).map { r =>
>>  | val upto_time = sdf.parse(r(23).trim);
>>  | calendar.setTime(upto_time);
>>  | val r23 = new java.sql.Timestamp(upto_time.getTime)
>>  | val insert_time = sdf.parse(r(27).trim)
>>  | calendar.setTime(insert_time)
>>  | val r27 = new java.sql.Timestamp(insert_time.getTime)
>>  | val last_upd_time = sdf.parse(r(28).trim)
>>  | calendar.setTime(last_upd_time)
>>  | val r28 = new java.sql.Timestamp(last_upd_time.getTime)
>>  | new sdp_d(r(0).trim.toInt, r(1).trim.toInt, r(2).trim,
>> r(3).trim.toInt, r(4).trim.toInt, r(5).trim, r(6).trim.toInt, r(7).trim,
>> r(8).trim.toDouble, r(9).trim.toDouble, r(10).trim, r(11).trim, r(12).trim,
>> r(13).trim, r(14).trim, r(15).trim, r(16).trim, r(17).trim, r(18).trim,
>> r(19).trim, r(20).trim, r(21).trim.toInt, r(22).trim, r23, r(24).trim,
>> r(25).trim, r(26).trim, r27, r28)
>>  | }.registerAsTable("sdp_d")
>>
>> :26: error: type mismatch;
>>  found   : Int
>>  required: Option[Int]
>>   new sdp_d(r(0).trim.toInt, r(1).trim.toInt, r(2).trim,
>> r(3).trim.toInt, r(4).trim.toInt, r(5).trim, r(6).trim.toInt, r(7).trim,
>> r(8).trim.toDouble, r(9).trim.toDouble, r(10).trim, r(11).trim, r(12).trim,
>> r(13).trim, r(14).trim, r(15).trim, r(16).trim, r(17).trim, r(18).trim,
>> r(19).trim, r(20).trim, r(21).trim.toInt, r(22).trim, r23, r(24).trim,
>> r(25).trim, r(26).trim, r27, r28)
>>
>> On Wed, Feb 25, 2015 at 2:32 PM, Akhil Das 
>> wrote:
>>
>>> It says sdp_d not found, since it is a class you need to instantiate it
>>> once. like:
>>>
>>> sc.textFile("derby.log").map(_.split(",")).map( r => {
>>>   val upto_time = sdf.parse(r(23).trim);
>>>   calendar.setTime(upto_time);
>>>   val r23 = new java.sql.Timestamp(upto_time.getTime);
>>>
>>>   val insert_time = sdf.parse(r(26).trim);
>>>   calendar.setTime(insert_time);
>>>   val r26 = new java.sql.Timestamp(insert_time.getTime);
>>>
>>>   val last_upd_time = sdf.parse(r(27).trim);
>>>   calendar.setTime(last_upd_time);
>>>   val r27 = new java.sql.Timestamp(last_upd_time.getTime);
>>>
>>>   *new* *sdp_d(r(0).trim.toInt, r(1).trim.toInt, r(2).trim,
>>> r(3).trim.toInt, r(4).trim.toInt, r(5).trim, r(6).trim.toInt, r(7).trim,
>>> r(8).trim.toDouble

Re: Number of Executors per worker process

2015-02-26 Thread Jeffrey Jedele
Hi Spico,

Yes, I think an "executor core" in Spark is basically a thread in a worker
pool. It's recommended to have one executor core per physical core on your
machine for best performance, but I think in theory you can create as many
threads as your OS allows.

For deployment:
There seems to be the actual worker JVM which coordinates the work on a
worker node. I don't think the actual thread pool lives in there, but a
separate JVM is created for each application that has cores allocated on
the node. Otherwise it would be rather hard to impose memory limits on
application level and it would have serious disadvantages regarding
stability.

You can check this behavior by looing at the processes on your machine:
ps aux | grep spark.deploy => will show  master, worker (coordinator) and
driver JVMs
ps aux | grep spark.executor => will show the actual worker JVMs

2015-02-25 14:23 GMT+01:00 Spico Florin :

> Hello!
>  I've read the documentation about the spark architecture, I have the
> following questions:
> 1: how many executors can be on a single worker process (JMV)?
> 2:Should I think executor like a Java Thread Executor where the pool size
> is equal with the number of the given cores (set up by the
> SPARK_WORKER_CORES)?
> 3. If the worker can have many executors, how this is handled by the
> Spark? Or can I handle by myself to set up the number of executors per JVM?
>
> I look forward for your answers.
>   Regards,
>   Florin
>


Re: CollectAsMap, Broadcasting.

2015-02-26 Thread Guillermo Ortiz
One last time to be sure I got it right, the executing sequence here
goes like this?:

val usersMap = contacts.collectAsMap()
#The contacts RDD is collected by the executors and sent to the
driver, the executors delete the rdd
contacts.map(v => (v._1, (usersMap(v._1), v._2))).collect()
#The userMap object is sent again to the executors to run the code,
and with the collect(), the result is sent again back to the driver


2015-02-26 11:57 GMT+01:00 Sean Owen :
> Yes, in that code, usersMap has been serialized to every executor.
> I thought you were referring to accessing the copy in the driver.
>
> On Thu, Feb 26, 2015 at 10:47 AM, Guillermo Ortiz  
> wrote:
>> Isn't it "contacts.map(v => (v._1, (usersMap(v._1), v._2))).collect()"
>> executed in the executors?  why is it executed in the driver?
>> contacts are not a local object, right?

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: CollectAsMap, Broadcasting.

2015-02-26 Thread Sean Owen
Yes, but there is no concept of executors 'deleting' an RDD. And you
would want to broadcast the usersMap if you're using it this way.

On Thu, Feb 26, 2015 at 11:26 AM, Guillermo Ortiz  wrote:
> One last time to be sure I got it right, the executing sequence here
> goes like this?:
>
> val usersMap = contacts.collectAsMap()
> #The contacts RDD is collected by the executors and sent to the
> driver, the executors delete the rdd
> contacts.map(v => (v._1, (usersMap(v._1), v._2))).collect()
> #The userMap object is sent again to the executors to run the code,
> and with the collect(), the result is sent again back to the driver
>
>
> 2015-02-26 11:57 GMT+01:00 Sean Owen :
>> Yes, in that code, usersMap has been serialized to every executor.
>> I thought you were referring to accessing the copy in the driver.
>>
>> On Thu, Feb 26, 2015 at 10:47 AM, Guillermo Ortiz  
>> wrote:
>>> Isn't it "contacts.map(v => (v._1, (usersMap(v._1), v._2))).collect()"
>>> executed in the executors?  why is it executed in the driver?
>>> contacts are not a local object, right?

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Creating hive table on spark ((ERROR))

2015-02-26 Thread sandeep vura
Hi Sparkers,

I am trying to creating hive table in SparkSql.But couldn't able to create
it.Below are the following errors which are generating so far.

java.lang.RuntimeException: java.lang.RuntimeException: Unable to
instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient
at
org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:346)
at
org.apache.spark.sql.hive.HiveContext$$anonfun$4.apply(HiveContext.scala:235)
at
org.apache.spark.sql.hive.HiveContext$$anonfun$4.apply(HiveContext.scala:231)
at scala.Option.orElse(Option.scala:257)
at
org.apache.spark.sql.hive.HiveContext.x$3$lzycompute(HiveContext.scala:231)
at org.apache.spark.sql.hive.HiveContext.x$3(HiveContext.scala:229)
at
org.apache.spark.sql.hive.HiveContext.hiveconf$lzycompute(HiveContext.scala:229)
at org.apache.spark.sql.hive.HiveContext.hiveconf(HiveContext.scala:229)
at
org.apache.spark.sql.hive.HiveMetastoreCatalog.(HiveMetastoreCatalog.scala:55)
at
org.apache.spark.sql.hive.HiveContext$$anon$1.(HiveContext.scala:253)
at
org.apache.spark.sql.hive.HiveContext.catalog$lzycompute(HiveContext.scala:253)
at org.apache.spark.sql.hive.HiveContext.catalog(HiveContext.scala:253)
at
org.apache.spark.sql.hive.HiveContext$$anon$3.(HiveContext.scala:263)
at
org.apache.spark.sql.hive.HiveContext.analyzer$lzycompute(HiveContext.scala:263)
at org.apache.spark.sql.hive.HiveContext.analyzer(HiveContext.scala:262)
at
org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:411)
at
org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:411)
at
org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58)
at org.apache.spark.sql.SchemaRDD.(SchemaRDD.scala:108)
at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:94)
at $iwC$$iwC$$iwC$$iwC.(:15)
at $iwC$$iwC$$iwC.(:20)
at $iwC$$iwC.(:22)
at $iwC.(:24)
at (:26)
at .(:30)
at .()
at .(:7)
at .()
at $print()
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:622)
at
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)
at
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)
at
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669)
at
org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:828)
at
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:873)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:785)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:628)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:636)
at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:641)
at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:968)
at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916)
at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916)
at
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1011)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:622)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.RuntimeException: Unable to instantiate
org.apache.hadoop.hive.metastore.HiveMetaStoreClient
at
org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1412)
at
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.(RetryingMetaStoreClient.java:62)
at
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:72)
at
org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:2453)
at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:2465)
at
org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:340)
... 59 more
Caused by: java.lang.reflect.Invoc

Stand-alone Spark on windows

2015-02-26 Thread Sergey Gerasimov
Hi!

I downloaded Spark binaries unpacked and could successfully run pyspark shell 
and write and execute some code here

BUT

I failed with submitting stand-alone python scripts or jar files via 
spark-submit:
spark-submit pi.py

I always get exception stack trace with NullPointerException in 
java.lang.ProcessBuilder.start().

What could be wrong?

Should I start some scripts before spark-submit?

I have windows 7 and spark 1.2.1

Sergey.



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark Drill 1.2.1 - error

2015-02-26 Thread Jahagirdar, Madhu
All,

We are getting the below error when we are using Drill JDBC driver with spark, 
please let us know what could be the issue.


java.lang.IllegalAccessError: class io.netty.buffer.UnsafeDirectLittleEndian 
cannot access its superclass io.netty.buffer.WrappedByteBuf
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at 
org.apache.drill.exec.memory.TopLevelAllocator.(TopLevelAllocator.java:43)
at 
org.apache.drill.exec.memory.TopLevelAllocator.(TopLevelAllocator.java:68)
at org.apache.drill.jdbc.DrillConnectionImpl.(DrillConnectionImpl.java:91)
at 
org.apache.drill.jdbc.DrillJdbc41Factory$DrillJdbc41Connection.(DrillJdbc41Factory.java:88)
at 
org.apache.drill.jdbc.DrillJdbc41Factory.newDrillConnection(DrillJdbc41Factory.java:57)
at 
org.apache.drill.jdbc.DrillJdbc41Factory.newDrillConnection(DrillJdbc41Factory.java:43)
at org.apache.drill.jdbc.DrillFactory.newConnection(DrillFactory.java:51)
at 
net.hydromatic.avatica.UnregisteredDriver.connect(UnregisteredDriver.java:126)
at java.sql.DriverManager.getConnection(DriverManager.java:571)
at java.sql.DriverManager.getConnection(DriverManager.java:233)
at $line13.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:17)
at $line13.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:17)
at org.apache.spark.rdd.JdbcRDD$$anon$1.(JdbcRDD.scala:76)
at org.apache.spark.rdd.JdbcRDD.compute(JdbcRDD.scala:73)
at org.apache.spark.rdd.JdbcRDD.compute(JdbcRDD.scala:53)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:261)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:228)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/02/26 10:16:03 ERROR executor.Executor: Exception in task 0.0 in stage 0.0 
(TID 0)
java.lang.IllegalAccessError: class io.netty.buffer.UnsafeDirectLittleEndian 
cannot access its superclass io.netty.buffer.WrappedByteBuf
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at 
org.apache.drill.exec.memory.TopLevelAllocator.(TopLevelAllocator.java:43)
at 
org.apache.drill.exec.memory.TopLevelAllocator.(TopLevelAllocator.java:68)
at org.apache.drill.jdbc.DrillConnectionImpl.(DrillConnectionImpl.java:91)
at 
org.apache.drill.jdbc.DrillJdbc41Factory$DrillJdbc41Connection.(DrillJdbc41Factory.java:88)
at 
org.apache.drill.jdbc.DrillJdbc41Factory.newDrillConnection(DrillJdbc41Factory.java:57)
at 
org.apache.drill.jdbc.DrillJdbc41Factory.newDrillConnection(DrillJdbc41Factory.java:43)
at org.apache.drill.jdbc.DrillFactory.newConnection(DrillFactory.java:51)
at 
net.hydromatic.avatica.UnregisteredDriver.connect(UnregisteredDriver.java:126)
at java.sql.DriverManager.getConnection(DriverManager.java:571)
at java.sql.DriverManager.getConnection(DriverManager.java:233)
at $line13.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:17)
at $line13.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:17)
at org.apache.spark.rdd.JdbcRDD$$anon$1.(JdbcRDD.scala:76)
at org.apache.spark.rdd.JdbcRDD.compute(JdbcRDD.scala:73)
at org.apache.spark.rdd.JdbcRDD.compute(JdbcRDD.scala:53)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:261)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:228)
at org.apache.spark.scheduler.
Regards,
Madhu Jahagirdar


The information contained in this message may be confidential and legally 
protected under applicable law. The message is intended solely for the 
addressee(s). If you are not the intended recipient, you are hereby notified 
that any use, forwarding, di

Re: CollectAsMap, Broadcasting.

2015-02-26 Thread Paweł Szulc
Correct me if I'm wrong, but he can actually run thus code without
broadcasting the users map,  however the code will be less efficient.

czw., 26 lut 2015, 12:31 PM Sean Owen użytkownik 
napisał:

> Yes, but there is no concept of executors 'deleting' an RDD. And you
> would want to broadcast the usersMap if you're using it this way.
>
> On Thu, Feb 26, 2015 at 11:26 AM, Guillermo Ortiz 
> wrote:
> > One last time to be sure I got it right, the executing sequence here
> > goes like this?:
> >
> > val usersMap = contacts.collectAsMap()
> > #The contacts RDD is collected by the executors and sent to the
> > driver, the executors delete the rdd
> > contacts.map(v => (v._1, (usersMap(v._1), v._2))).collect()
> > #The userMap object is sent again to the executors to run the code,
> > and with the collect(), the result is sent again back to the driver
> >
> >
> > 2015-02-26 11:57 GMT+01:00 Sean Owen :
> >> Yes, in that code, usersMap has been serialized to every executor.
> >> I thought you were referring to accessing the copy in the driver.
> >>
> >> On Thu, Feb 26, 2015 at 10:47 AM, Guillermo Ortiz 
> wrote:
> >>> Isn't it "contacts.map(v => (v._1, (usersMap(v._1), v._2))).collect()"
> >>> executed in the executors?  why is it executed in the driver?
> >>> contacts are not a local object, right?
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: CollectAsMap, Broadcasting.

2015-02-26 Thread Sean Owen
Yes that's correct; it works but broadcasting would be more efficient.

On Thu, Feb 26, 2015 at 1:20 PM, Paweł Szulc  wrote:
> Correct me if I'm wrong, but he can actually run thus code without
> broadcasting the users map,  however the code will be less efficient.
>
>
> czw., 26 lut 2015, 12:31 PM Sean Owen użytkownik 
> napisał:
>>
>> Yes, but there is no concept of executors 'deleting' an RDD. And you
>> would want to broadcast the usersMap if you're using it this way.
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Which one is faster / consumes less memory: collect() or count()?

2015-02-26 Thread Emre Sevinc
Hello,

I have a piece of code to force the materialization of RDDs in my Spark
Streaming program, and I'm trying to understand which method is faster and
has less memory consumption:

  javaDStream.foreachRDD(new Function, Void>() {
  @Override
  public Void call(JavaRDD stringJavaRDD) throws Exception {

//stringJavaRDD.collect();

   // or count?

//stringJavaRDD.count();

return null;
  }
});


I've checked the source code of Spark at
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala,
and see that collect() is defined as:

  def collect(): Array[T] = {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
   Array.concat(results: _*)
  }

and count() defined as:

  def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

Therefore I think calling the count() method is faster and/or consumes less
memory, but I wanted to be sure.

Anyone cares to comment?


-- 
Emre Sevinç
http://www.bigindustries.be/


Re: Which one is faster / consumes less memory: collect() or count()?

2015-02-26 Thread francois . garillot
The short answer:

count(), as the sum can be partially aggregated on the mappers.




The long answer:

http://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/dont_call_collect_on_a_very_large_rdd.html



—
FG

On Thu, Feb 26, 2015 at 2:28 PM, Emre Sevinc 
wrote:

> Hello,
> I have a piece of code to force the materialization of RDDs in my Spark
> Streaming program, and I'm trying to understand which method is faster and
> has less memory consumption:
>   javaDStream.foreachRDD(new Function, Void>() {
>   @Override
>   public Void call(JavaRDD stringJavaRDD) throws Exception {
> //stringJavaRDD.collect();
>// or count?
> //stringJavaRDD.count();
> return null;
>   }
> });
> I've checked the source code of Spark at
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala,
> and see that collect() is defined as:
>   def collect(): Array[T] = {
> val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
>Array.concat(results: _*)
>   }
> and count() defined as:
>   def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
> Therefore I think calling the count() method is faster and/or consumes less
> memory, but I wanted to be sure.
> Anyone cares to comment?
> -- 
> Emre Sevinç
> http://www.bigindustries.be/

Re: Which one is faster / consumes less memory: collect() or count()?

2015-02-26 Thread Emre Sevinc
Francois,

Thank you for quickly verifying.

Kind regards,
Emre Sevinç

On Thu, Feb 26, 2015 at 2:32 PM,  wrote:

> The short answer:
> count(), as the sum can be partially aggregated on the mappers.
>
> The long answer:
>
> http://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/dont_call_collect_on_a_very_large_rdd.html
>
> —
> FG
>
>
> On Thu, Feb 26, 2015 at 2:28 PM, Emre Sevinc 
> wrote:
>
>>  Hello,
>>
>> I have a piece of code to force the materialization of RDDs in my Spark
>> Streaming program, and I'm trying to understand which method is faster and
>> has less memory consumption:
>>
>>   javaDStream.foreachRDD(new Function, Void>() {
>>   @Override
>>   public Void call(JavaRDD stringJavaRDD) throws Exception {
>>
>> //stringJavaRDD.collect();
>>
>>// or count?
>>
>> //stringJavaRDD.count();
>>
>> return null;
>>   }
>> });
>>
>>
>> I've checked the source code of Spark at
>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala,
>> and see that collect() is defined as:
>>
>>   def collect(): Array[T] = {
>> val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
>>Array.concat(results: _*)
>>   }
>>
>> and count() defined as:
>>
>>   def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
>>
>> Therefore I think calling the count() method is faster and/or consumes
>> less memory, but I wanted to be sure.
>>
>> Anyone cares to comment?
>>
>>
>> --
>> Emre Sevinç
>> http://www.bigindustries.be/
>>
>>
>


-- 
Emre Sevinc


Re: Which one is faster / consumes less memory: collect() or count()?

2015-02-26 Thread Sean Owen
Those do quite different things. One counts the data; the other copies
all of the data to the driver.

The fastest way to materialize an RDD that I know of is
foreachPartition(i => None)  (or equivalent no-op VoidFunction in
Java)

On Thu, Feb 26, 2015 at 1:28 PM, Emre Sevinc  wrote:
> Hello,
>
> I have a piece of code to force the materialization of RDDs in my Spark
> Streaming program, and I'm trying to understand which method is faster and
> has less memory consumption:
>
>   javaDStream.foreachRDD(new Function, Void>() {
>   @Override
>   public Void call(JavaRDD stringJavaRDD) throws Exception {
>
> //stringJavaRDD.collect();
>
>// or count?
>
> //stringJavaRDD.count();
>
> return null;
>   }
> });
>
>
> I've checked the source code of Spark at
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala,
> and see that collect() is defined as:
>
>   def collect(): Array[T] = {
> val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
>Array.concat(results: _*)
>   }
>
> and count() defined as:
>
>   def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
>
> Therefore I think calling the count() method is faster and/or consumes less
> memory, but I wanted to be sure.
>
> Anyone cares to comment?
>
>
> --
> Emre Sevinç
> http://www.bigindustries.be/
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Which one is faster / consumes less memory: collect() or count()?

2015-02-26 Thread francois . garillot
Note I’m assuming you were going for the size of your RDD, meaning in the 
‘collect’ alternative, you would go for a size() right after the collect().




If you were simply trying to materialize your RDD, Sean’s answer is more 
complete.


—
FG

On Thu, Feb 26, 2015 at 2:33 PM, Emre Sevinc 
wrote:

> Francois,
> Thank you for quickly verifying.
> Kind regards,
> Emre Sevinç
> On Thu, Feb 26, 2015 at 2:32 PM,  wrote:
>> The short answer:
>> count(), as the sum can be partially aggregated on the mappers.
>>
>> The long answer:
>>
>> http://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/dont_call_collect_on_a_very_large_rdd.html
>>
>> —
>> FG
>>
>>
>> On Thu, Feb 26, 2015 at 2:28 PM, Emre Sevinc 
>> wrote:
>>
>>>  Hello,
>>>
>>> I have a piece of code to force the materialization of RDDs in my Spark
>>> Streaming program, and I'm trying to understand which method is faster and
>>> has less memory consumption:
>>>
>>>   javaDStream.foreachRDD(new Function, Void>() {
>>>   @Override
>>>   public Void call(JavaRDD stringJavaRDD) throws Exception {
>>>
>>> //stringJavaRDD.collect();
>>>
>>>// or count?
>>>
>>> //stringJavaRDD.count();
>>>
>>> return null;
>>>   }
>>> });
>>>
>>>
>>> I've checked the source code of Spark at
>>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala,
>>> and see that collect() is defined as:
>>>
>>>   def collect(): Array[T] = {
>>> val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
>>>Array.concat(results: _*)
>>>   }
>>>
>>> and count() defined as:
>>>
>>>   def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
>>>
>>> Therefore I think calling the count() method is faster and/or consumes
>>> less memory, but I wanted to be sure.
>>>
>>> Anyone cares to comment?
>>>
>>>
>>> --
>>> Emre Sevinç
>>> http://www.bigindustries.be/
>>>
>>>
>>
> -- 
> Emre Sevinc

[ANNOUNCE] Apache MRQL 0.9.4-incubating released

2015-02-26 Thread Leonidas Fegaras

The Apache MRQL team is pleased to announce the release of
Apache MRQL 0.9.4-incubating. This is our second Apache release.
Apache MRQL is a query processing and optimization system for
large-scale, distributed data analysis, built on top of
Apache Hadoop, Hama, Spark, and Flink.

The release artifacts are available at:
http://www.apache.org/dyn/closer.cgi/incubator/mrql
Release notes:
http://mrql.incubator.apache.org/ReleaseNotes-0.9.4.html
Installation instructions at:
http://mrql.incubator.apache.org/getting_started.html

We welcome your help, feedback, and suggestions. For more information
on how to report problems and to get involved, please visit the project
website at: http://mrql.incubator.apache.org/
and the project wiki at: http://wiki.apache.org/mrql/

The Apache MRQL Team

Disclaimer:
Apache MRQL is an effort undergoing incubation at The Apache Software
Foundation (ASF) sponsored by the Apache Incubator PMC. Incubation is
required of all newly accepted projects until a further review
indicates that the infrastructure, communications, and decision making
process have stabilized in a manner consistent with other successful
ASF projects. While incubation status is not necessarily a reflection
of the completeness or stability of the code, it does indicate that
the project has yet to be fully endorsed by the ASF.


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: different result from implicit ALS with explicit ALS

2015-02-26 Thread lisendong
okay, I have brought this to the user@list

I don’t think the negative pair should be omitted…..


if the score of all of the pairs are 1.0, the result will be worse…I have tried…


Best Regards, 
Sendong Li
> 在 2015年2月26日,下午10:07,Sean Owen  写道:
> 
> Yes, I mean, do not generate a Rating for these data points. What then?
> 
> Also would you care to bring this to the user@ list? it's kind of interesting.
> 
> On Thu, Feb 26, 2015 at 2:02 PM, lisendong  wrote:
>> I set the score of ‘0’ interaction user-item pair to 0.0
>> the code is as following:
>> 
>> if (ifclick > 0) {
>>score = 1.0;
>> }
>> else {
>>score = 0.0;
>> }
>> return new Rating(user_id, photo_id, score);
>> 
>> both method use the same ratings rdd
>> 
>> because of the same random seed(1 in my case), the result is stable.
>> 
>> 
>> Best Regards,
>> Sendong Li
>> 
>> 
>> 在 2015年2月26日,下午9:53,Sean Owen  写道:
>> 
>> 
>> I see why you say that, yes.
>> 
>> Are you actually encoding the '0' interactions, or just omitting them?
>> I think you should do the latter.
>> 
>> Is the AUC stable over many runs or did you just run once?
>> 
>> On Thu, Feb 26, 2015 at 1:42 PM, lisendong  wrote:
>> 
>> Hi meng, fotero, sowen:
>> 
>> I’m using ALS with spark 1.0.0, the code should be:
>> https://github.com/apache/spark/blob/branch-1.0/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
>> 
>> I think the following two method should produce the same (or near) result:
>> 
>> MatrixFactorizationModel model = ALS.train(ratings.rdd(), 30, 30, 0.01, -1,
>> 1);
>> 
>> MatrixFactorizationModel model = ALS.trainImplicit(ratings.rdd(), 30, 30,
>> 0.01, -1, 0, 1);
>> 
>> the data I used is display log, the format of log is as following:
>> 
>> user  item  if-click
>> 
>> 
>> 
>> 
>> 
>> 
>> I use 1.0 as score for click pair, and 0 as score for non-click pair.
>> 
>> in the second method, the alpha is set to zero, so the confidence for
>> positive and negative are both 1.0 (right?)
>> 
>> I think the two method should produce similar result, but the result is :
>> the second method’s result is very bad (the AUC of the first result is 0.7,
>> but the AUC of the second result is only 0.61)
>> 
>> 
>> I could not understand why, could you help me?
>> 
>> 
>> Thank you very much!
>> 
>> Best Regards,
>> Sendong Li
>> 
>> 



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



different result from implicit ALS with explicit ALS

2015-02-26 Thread lisendong

I’m using ALS with spark 1.0.0, the code should be:
https://github.com/apache/spark/blob/branch-1.0/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala

I think the following two method should produce the same (or near) result:

MatrixFactorizationModel model = ALS.train(ratings.rdd(), 30, 30, 0.01, -1,
1);
MatrixFactorizationModel model = ALS.trainImplicit(ratings.rdd(), 30, 30,
0.01, -1, 0, 1);


the data I used is display log, the format of log is as following:

user  item  if-click


I use 1.0 as score for click pair, and 0 as score for non-click pair.

 in the second method, the alpha is set to zero, so the confidence for
positive and negative are both 1.0 (right?)

I think the two method should produce similar result, but the result is : 
the second method’s result is very bad (the AUC of the first result is 0.7,
but the AUC of the second result is only 0.61)


I could not understand why, could you help me?


Thank you very much!




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/different-result-from-implicit-ALS-with-explicit-ALS-tp21823.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: different result from implicit ALS with explicit ALS

2015-02-26 Thread Sean Owen
+user

On Thu, Feb 26, 2015 at 2:26 PM, Sean Owen  wrote:

> I think I may have it backwards, and that you are correct to keep the 0
> elements in train() in order to try to reproduce the same result.
>
> The second formulation is called 'weighted regularization' and is used for
> both implicit and explicit feedback, as far as I can see in the code.
>
> Hm, I'm actually not clear why these would produce different results.
> Different code paths are used to be sure, but I'm not yet sure why they
> would give different results.
>
> In general you wouldn't use train() for data like this though, and would
> never set alpha=0.
>
> On Thu, Feb 26, 2015 at 2:15 PM, lisendong  wrote:
>
>> I want to confirm the loss function you use (sorry I’m not so familiar
>> with scala code so I did not understand the source code of mllib)
>>
>> According to the papers :
>>
>>
>> in your implicit feedback ALS, the loss function is (ICDM 2008):
>>
>> in the explicit feedback ALS, the loss function is (Netflix 2008):
>>
>> note that besides the difference of confidence parameter Cui, the 
>> regularization
>> is also different.  does your code also has this difference?
>>
>> Best Regards,
>> Sendong Li
>>
>>
>> 在 2015年2月26日,下午9:42,lisendong  写道:
>>
>> Hi meng, fotero, sowen:
>>
>> I’m using ALS with spark 1.0.0, the code should be:
>>
>> https://github.com/apache/spark/blob/branch-1.0/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
>>
>> I think the following two method should produce the same (or near) result:
>>
>> MatrixFactorizationModel model = ALS.train(ratings.rdd(), 30, 30, 0.01, -1, 
>> 1);
>>
>> MatrixFactorizationModel model = ALS.trainImplicit(ratings.rdd(), 30, 30, 
>> 0.01, -1, 0, 1);
>>
>> the data I used is display log, the format of log is as following:
>>
>> user  item  if-click
>>
>>
>>
>>
>>
>>
>> I use 1.0 as score for click pair, and 0 as score for non-click pair.
>>
>>  in the second method, the alpha is set to zero, so the confidence for
>> positive and negative are both 1.0 (right?)
>>
>> I think the two method should produce similar result, but the result is :
>>  the second method’s result is very bad (the AUC of the first result is
>> 0.7, but the AUC of the second result is only 0.61)
>>
>>
>> I could not understand why, could you help me?
>>
>>
>> Thank you very much!
>>
>> Best Regards,
>> Sendong Li
>>
>>
>>
>


can not submit job to spark in windows

2015-02-26 Thread sergunok
Hi!

I downloaded and extracted Spark to local folder under windows 7 and have
successfully played with it in pyspark interactive shell.

BUT

When I try to use spark-submit (for example: job-submit pi.py ) I get:

C:\spark-1.2.1-bin-hadoop2.4\bin>spark-submit.cmd pi.py
Using Spark's default log4j profile:
org/apache/spark/log4j-defaults.properties
15/02/26 18:21:37 INFO SecurityManager: Changing view acls to: sergun
15/02/26 18:21:37 INFO SecurityManager: Changing modify acls to: sergun
15/02/26 18:21:37 INFO SecurityManager: SecurityManager: authentication
disabled
; ui acls disabled; users with view permissions: Set(sergun); users with mo
dify permissions: Set(user)
15/02/26 18:21:38 INFO Slf4jLogger: Slf4jLogger started
15/02/26 18:21:38 INFO Remoting: Starting remoting
15/02/26 18:21:39 INFO Remoting: Remoting started; listening on addresses
:[akka
.tcp://sparkDriver@mypc:56640]
15/02/26 18:21:39 INFO Utils: Successfully started service 'sparkDriver' on
port
 56640.
15/02/26 18:21:39 INFO SparkEnv: Registering MapOutputTracker
15/02/26 18:21:39 INFO SparkEnv: Registering BlockManagerMaster
15/02/26 18:21:39 INFO DiskBlockManager: Created local directory at
C:\Users\sergun\AppData\Local\Temp\spark-adddeb0b-d6c8-4720-92e3-05255d46ea66\spark-c65cd4
06-28a4-486d-a1ad-92e4814df6fa
15/02/26 18:21:39 INFO MemoryStore: MemoryStore started with capacity 265.0
MB
15/02/26 18:21:40 WARN NativeCodeLoader: Unable to load native-hadoop
library fo
r your platform... using builtin-java classes where applicable
15/02/26 18:21:40 ERROR Shell: Failed to locate the winutils binary in the
hadoo
p binary path
java.io.IOException: Could not locate executable C:\\bin\winutils.exe in the
Had
oop binaries.
at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:318)
at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:333)
at org.apache.hadoop.util.Shell.(Shell.java:326)
at org.apache.hadoop.util.StringUtils.(StringUtils.java:76)
at
org.apache.hadoop.security.Groups.parseStaticMapping(Groups.java:93)
at org.apache.hadoop.security.Groups.(Groups.java:77)
at
org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Group
s.java:240)
at
org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupI
nformation.java:255)
at
org.apache.hadoop.security.UserGroupInformation.setConfiguration(User
GroupInformation.java:283)
at
org.apache.spark.deploy.SparkHadoopUtil.(SparkHadoopUtil.scala:
44)
at
org.apache.spark.deploy.SparkHadoopUtil$.(SparkHadoopUtil.scala
:214)
at
org.apache.spark.deploy.SparkHadoopUtil$.(SparkHadoopUtil.sca
la)
at
org.apache.spark.util.Utils$.getSparkOrYarnConfig(Utils.scala:1873)
at
org.apache.spark.storage.BlockManager.(BlockManager.scala:105)
at
org.apache.spark.storage.BlockManager.(BlockManager.scala:180)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:308)
at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:159)
at org.apache.spark.SparkContext.(SparkContext.scala:240)
at
org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.sc
ala:61)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)

at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown
Source)

at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown
Sou
rce)
at java.lang.reflect.Constructor.newInstance(Unknown Source)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234)
at
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:214)
at
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand
.java:79)
at
py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:68)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Unknown Source)
15/02/26 18:21:41 INFO HttpFileServer: HTTP File server directory is
C:\Users\sergun\AppData\Local\Temp\spark-79f2a924-4fff-432c-abc8-ac9c6c4ee0c7\spark-1f295
e28-f0db-4daf-b877-2a47990b6e88
15/02/26 18:21:41 INFO HttpServer: Starting HTTP Server
15/02/26 18:21:41 INFO Utils: Successfully started service 'HTTP file
server' on
 port 56641.
15/02/26 18:21:41 INFO Utils: Successfully started service 'SparkUI' on port
404
0.
15/02/26 18:21:41 INFO SparkUI: Started SparkUI at http://mypc:4040
15/02/26 18:21:42 INFO Utils: Copying C:\spark-1.2.1-bin-hadoop2.4\bin\pi.py
to
C:\Users\sergun\AppData\Local\Temp\spark-76a21028-ccce-4308-9e70-09c3cfa76477\
spark-56b32155-2779-4345-9597-2bfa6a87a51d\pi.py
Traceback (most recent call last):
  File "C:/spark-1.2.1-bin-hadoop2.4/bin/pi.py", line 29, in 
sc = SparkContext(appName="PythonPi")
  File "C:\spark-1.2.1-bin-hadoop2.4\python\pyspark\context.py", line 105,
in __
init__
conf, jsc)
  File "C:\spark-1.2.1-bin-hadoop2.4\python\pyspark\context.py", lin

Re: Creating hive table on spark ((ERROR))

2015-02-26 Thread Cheng Lian
Seems that you are running Hive metastore over MySQL, but don’t have 
MySQL JDBC driver on classpath:


   Caused by:
   org.datanucleus.store.rdbms.connectionpool.DatastoreDriverNotFoundException:
   The specified datastore driver (“com.mysql.jdbc.Driver”) was not
   found in the CLASSPATH. Please check your CLASSPATH specification,
   and the name of the driver.

Cheng

On 2/26/15 8:03 PM, sandeep vura wrote:


Hi Sparkers,

I am trying to creating hive table in SparkSql.But couldn't able to 
create it.Below are the following errors which are generating so far.


java.lang.RuntimeException: java.lang.RuntimeException: Unable to 
instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient
at 
org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:346)
at 
org.apache.spark.sql.hive.HiveContext$anonfun$4.apply(HiveContext.scala:235)
at 
org.apache.spark.sql.hive.HiveContext$anonfun$4.apply(HiveContext.scala:231)

at scala.Option.orElse(Option.scala:257)
at 
org.apache.spark.sql.hive.HiveContext.x$3$lzycompute(HiveContext.scala:231)

at org.apache.spark.sql.hive.HiveContext.x$3(HiveContext.scala:229)
at 
org.apache.spark.sql.hive.HiveContext.hiveconf$lzycompute(HiveContext.scala:229)
at 
org.apache.spark.sql.hive.HiveContext.hiveconf(HiveContext.scala:229)
at 
org.apache.spark.sql.hive.HiveMetastoreCatalog.(HiveMetastoreCatalog.scala:55)
at 
org.apache.spark.sql.hive.HiveContext$anon$1.(HiveContext.scala:253)
at 
org.apache.spark.sql.hive.HiveContext.catalog$lzycompute(HiveContext.scala:253)
at 
org.apache.spark.sql.hive.HiveContext.catalog(HiveContext.scala:253)
at 
org.apache.spark.sql.hive.HiveContext$anon$3.(HiveContext.scala:263)
at 
org.apache.spark.sql.hive.HiveContext.analyzer$lzycompute(HiveContext.scala:263)
at 
org.apache.spark.sql.hive.HiveContext.analyzer(HiveContext.scala:262)
at 
org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:411)
at 
org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:411)
at 
org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58)

at org.apache.spark.sql.SchemaRDD.(SchemaRDD.scala:108)
at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:94)
at $iwC$iwC$iwC$iwC.(:15)
at $iwC$iwC$iwC.(:20)
at $iwC$iwC.(:22)
at $iwC.(:24)
at (:26)
at .(:30)
at .()
at .(:7)
at .()
at $print()
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:622)
at 
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)
at 
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)
at 
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)

at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669)
at 
org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:828)
at 
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:873)

at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:785)
at 
org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:628)

at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:636)
at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:641)
at 
org.apache.spark.repl.SparkILoop$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:968)
at 
org.apache.spark.repl.SparkILoop$anonfun$process$1.apply(SparkILoop.scala:916)
at 
org.apache.spark.repl.SparkILoop$anonfun$process$1.apply(SparkILoop.scala:916)
at 
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)

at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1011)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:622)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.RuntimeException: Unable to instantiate 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient
at 
org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1412)
at 
org.apache.hadoop.hive.metastore.Ret

Re: different result from implicit ALS with explicit ALS

2015-02-26 Thread Xiangrui Meng
Lisen, did you use all m-by-n pairs during training? Implicit model
penalizes unobserved ratings, while explicit model doesn't. -Xiangrui

On Feb 26, 2015 6:26 AM, "Sean Owen"  wrote:
>
> +user
>
> On Thu, Feb 26, 2015 at 2:26 PM, Sean Owen  wrote:
>>
>> I think I may have it backwards, and that you are correct to keep the 0
elements in train() in order to try to reproduce the same result.
>>
>> The second formulation is called 'weighted regularization' and is used
for both implicit and explicit feedback, as far as I can see in the code.
>>
>> Hm, I'm actually not clear why these would produce different results.
Different code paths are used to be sure, but I'm not yet sure why they
would give different results.
>>
>> In general you wouldn't use train() for data like this though, and would
never set alpha=0.
>>
>> On Thu, Feb 26, 2015 at 2:15 PM, lisendong  wrote:
>>>
>>> I want to confirm the loss function you use (sorry I’m not so familiar
with scala code so I did not understand the source code of mllib)
>>>
>>> According to the papers :
>>>
>>>
>>> in your implicit feedback ALS, the loss function is (ICDM 2008):
>>>
>>> in the explicit feedback ALS, the loss function is (Netflix 2008):
>>>
>>> note that besides the difference of confidence parameter Cui,
the regularization is also different.  does your code also has this
difference?
>>>
>>> Best Regards,
>>> Sendong Li
>>>
>>>
 在 2015年2月26日,下午9:42,lisendong  写道:

 Hi meng, fotero, sowen:

 I’m using ALS with spark 1.0.0, the code should be:

https://github.com/apache/spark/blob/branch-1.0/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala

 I think the following two method should produce the same (or near)
result:

 MatrixFactorizationModel model = ALS.train(ratings.rdd(), 30, 30,
0.01, -1, 1);

 MatrixFactorizationModel model = ALS.trainImplicit(ratings.rdd(), 30,
30, 0.01, -1, 0, 1);

 the data I used is display log, the format of log is as following:

 user  item  if-click






 I use 1.0 as score for click pair, and 0 as score for non-click pair.

  in the second method, the alpha is set to zero, so the confidence for
positive and negative are both 1.0 (right?)

 I think the two method should produce similar result, but the result
is :  the second method’s result is very bad (the AUC of the first result
is 0.7, but the AUC of the second result is only 0.61)


 I could not understand why, could you help me?


 Thank you very much!

 Best Regards,
 Sendong Li
>>>
>>>
>>
>


Re: Which one is faster / consumes less memory: collect() or count()?

2015-02-26 Thread Emre Sevinc
Hello Sean,

Thank you for your advice. Based on your suggestion, I've modified the code
into the following (and once again admired the easy (!) verbosity of Java
compared to 'complex and hard to understand' brevity (!) of Scala):

javaDStream.foreachRDD(
new Function, Void>() {
  @Override
  public Void call(JavaRDD stringJavaRDD) throws
Exception {
stringJavaRDD.foreachPartition(
new VoidFunction>() {
  @Override
  public void call(Iterator iteratorString)
{
return;
  }
}
);

return null;
  }
});


I've tested the above in my application, and also observed it with Visual
VM but could not see a dramatic speed difference (and small heap usage
difference) compared to my initial version where I just use .count() in a
foreachRDD block.

Nevertheless I'll make more experiments to see if differences come up in
terms of speed/memory.

Kind regards,

Emre Sevinç
http://www.bigindustries.be/





On Thu, Feb 26, 2015 at 2:34 PM, Sean Owen  wrote:

> Those do quite different things. One counts the data; the other copies
> all of the data to the driver.
>
> The fastest way to materialize an RDD that I know of is
> foreachPartition(i => None)  (or equivalent no-op VoidFunction in
> Java)
>
> On Thu, Feb 26, 2015 at 1:28 PM, Emre Sevinc 
> wrote:
> > Hello,
> >
> > I have a piece of code to force the materialization of RDDs in my Spark
> > Streaming program, and I'm trying to understand which method is faster
> and
> > has less memory consumption:
> >
> >   javaDStream.foreachRDD(new Function, Void>() {
> >   @Override
> >   public Void call(JavaRDD stringJavaRDD) throws Exception {
> >
> > //stringJavaRDD.collect();
> >
> >// or count?
> >
> > //stringJavaRDD.count();
> >
> > return null;
> >   }
> > });
> >
> >
> > I've checked the source code of Spark at
> >
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala
> ,
> > and see that collect() is defined as:
> >
> >   def collect(): Array[T] = {
> > val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
> >Array.concat(results: _*)
> >   }
> >
> > and count() defined as:
> >
> >   def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
> >
> > Therefore I think calling the count() method is faster and/or consumes
> less
> > memory, but I wanted to be sure.
> >
> > Anyone cares to comment?
> >
> >
> > --
> > Emre Sevinç
> > http://www.bigindustries.be/
> >
>



-- 
Emre Sevinc


Re: different result from implicit ALS with explicit ALS

2015-02-26 Thread 163
oh my god, I think I understood...
In my case, there are three kinds of user-item pairs:

Display and click pair(positive pair)
Display but no-click pair(negative pair)
No-display pair(unobserved pair)

Explicit ALS only consider the first and the second kinds
But implicit ALS consider all the three kinds of pair(and consider the third 
kind as the second pair, because their preference value are all zero and 
confidence are all 1)

So the result are different. right?

Could you please give me some advice, which ALS should I use?
If I use the implicit ALS, how to distinguish the second and the third kind of 
pair:)

My opinion is in my case, I should use explicit ALS ...

Thank you so much

> 在 2015年2月26日,22:41,Xiangrui Meng  写道:
> 
> Lisen, did you use all m-by-n pairs during training? Implicit model penalizes 
> unobserved ratings, while explicit model doesn't. -Xiangrui
> 
> On Feb 26, 2015 6:26 AM, "Sean Owen"  wrote:
> >
> > +user
> >
> > On Thu, Feb 26, 2015 at 2:26 PM, Sean Owen  wrote:
> >>
> >> I think I may have it backwards, and that you are correct to keep the 0 
> >> elements in train() in order to try to reproduce the same result.
> >>
> >> The second formulation is called 'weighted regularization' and is used for 
> >> both implicit and explicit feedback, as far as I can see in the code.
> >>
> >> Hm, I'm actually not clear why these would produce different results. 
> >> Different code paths are used to be sure, but I'm not yet sure why they 
> >> would give different results.
> >>
> >> In general you wouldn't use train() for data like this though, and would 
> >> never set alpha=0.
> >>
> >> On Thu, Feb 26, 2015 at 2:15 PM, lisendong  wrote:
> >>>
> >>> I want to confirm the loss function you use (sorry I’m not so familiar 
> >>> with scala code so I did not understand the source code of mllib)
> >>>
> >>> According to the papers :
> >>>
> >>>
> >>> in your implicit feedback ALS, the loss function is (ICDM 2008):
> >>>
> >>> in the explicit feedback ALS, the loss function is (Netflix 2008):
> >>>
> >>> note that besides the difference of confidence parameter Cui, the 
> >>> regularization is also different.  does your code also has this 
> >>> difference?
> >>>
> >>> Best Regards,
> >>> Sendong Li
> >>>
> >>>
>  在 2015年2月26日,下午9:42,lisendong  写道:
> 
>  Hi meng, fotero, sowen:
> 
>  I’m using ALS with spark 1.0.0, the code should be:
>  https://github.com/apache/spark/blob/branch-1.0/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
> 
>  I think the following two method should produce the same (or near) 
>  result:
> 
>  MatrixFactorizationModel model = ALS.train(ratings.rdd(), 30, 30, 0.01, 
>  -1, 1);
> 
>  MatrixFactorizationModel model = ALS.trainImplicit(ratings.rdd(), 30, 
>  30, 0.01, -1, 0, 1);
> 
>  the data I used is display log, the format of log is as following:
> 
>  user  item  if-click
> 
> 
> 
> 
> 
> 
>  I use 1.0 as score for click pair, and 0 as score for non-click pair.
> 
>   in the second method, the alpha is set to zero, so the confidence for 
>  positive and negative are both 1.0 (right?)
> 
>  I think the two method should produce similar result, but the result is 
>  :  the second method’s result is very bad (the AUC of the first result 
>  is 0.7, but the AUC of the second result is only 0.61)
> 
> 
>  I could not understand why, could you help me?
> 
> 
>  Thank you very much!
> 
>  Best Regards, 
>  Sendong Li
> >>>
> >>>
> >>
> >


Re: different result from implicit ALS with explicit ALS

2015-02-26 Thread Sean Owen
I believe that's right, and is what I was getting at. yes the implicit
formulation ends up implicitly including every possible interaction in
its loss function, even unobserved ones. That could be the difference.

This is mostly an academic question though. In practice, you have
click-like data and should be using the implicit version for sure.

However you can give negative implicit feedback to the model. You
could consider no-click as a mild, observed, negative interaction.
That is: supply a small negative value for these cases. Unobserved
pairs are not part of the data set. I'd be careful about assuming the
lack of an action carries signal.

On Thu, Feb 26, 2015 at 3:07 PM, 163  wrote:
> oh my god, I think I understood...
> In my case, there are three kinds of user-item pairs:
>
> Display and click pair(positive pair)
> Display but no-click pair(negative pair)
> No-display pair(unobserved pair)
>
> Explicit ALS only consider the first and the second kinds
> But implicit ALS consider all the three kinds of pair(and consider the third
> kind as the second pair, because their preference value are all zero and
> confidence are all 1)
>
> So the result are different. right?
>
> Could you please give me some advice, which ALS should I use?
> If I use the implicit ALS, how to distinguish the second and the third kind
> of pair:)
>
> My opinion is in my case, I should use explicit ALS ...
>
> Thank you so much
>
> 在 2015年2月26日,22:41,Xiangrui Meng  写道:
>
> Lisen, did you use all m-by-n pairs during training? Implicit model
> penalizes unobserved ratings, while explicit model doesn't. -Xiangrui
>
> On Feb 26, 2015 6:26 AM, "Sean Owen"  wrote:
>>
>> +user
>>
>> On Thu, Feb 26, 2015 at 2:26 PM, Sean Owen  wrote:
>>>
>>> I think I may have it backwards, and that you are correct to keep the 0
>>> elements in train() in order to try to reproduce the same result.
>>>
>>> The second formulation is called 'weighted regularization' and is used
>>> for both implicit and explicit feedback, as far as I can see in the code.
>>>
>>> Hm, I'm actually not clear why these would produce different results.
>>> Different code paths are used to be sure, but I'm not yet sure why they
>>> would give different results.
>>>
>>> In general you wouldn't use train() for data like this though, and would
>>> never set alpha=0.
>>>
>>> On Thu, Feb 26, 2015 at 2:15 PM, lisendong  wrote:

 I want to confirm the loss function you use (sorry I’m not so familiar
 with scala code so I did not understand the source code of mllib)

 According to the papers :


 in your implicit feedback ALS, the loss function is (ICDM 2008):

 in the explicit feedback ALS, the loss function is (Netflix 2008):

 note that besides the difference of confidence parameter Cui, the
 regularization is also different.  does your code also has this difference?

 Best Regards,
 Sendong Li


> 在 2015年2月26日,下午9:42,lisendong  写道:
>
> Hi meng, fotero, sowen:
>
> I’m using ALS with spark 1.0.0, the code should be:
>
> https://github.com/apache/spark/blob/branch-1.0/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
>
> I think the following two method should produce the same (or near)
> result:
>
> MatrixFactorizationModel model = ALS.train(ratings.rdd(), 30, 30, 0.01,
> -1, 1);
>
> MatrixFactorizationModel model = ALS.trainImplicit(ratings.rdd(), 30,
> 30, 0.01, -1, 0, 1);
>
> the data I used is display log, the format of log is as following:
>
> user  item  if-click
>
>
>
>
>
>
> I use 1.0 as score for click pair, and 0 as score for non-click pair.
>
>  in the second method, the alpha is set to zero, so the confidence for
> positive and negative are both 1.0 (right?)
>
> I think the two method should produce similar result, but the result is
> :  the second method’s result is very bad (the AUC of the first result is
> 0.7, but the AUC of the second result is only 0.61)
>
>
> I could not understand why, could you help me?
>
>
> Thank you very much!
>
> Best Regards,
> Sendong Li


>>>
>>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Job submission via multiple threads

2015-02-26 Thread Kartheek.R
Hi,

I just wrote an application that intends to submit its actions(jobs) via
independent threads keeping in view of the point: "Second, within each
Spark application, multiple “jobs” (Spark actions) may be running
concurrently if they were submitted by different threads", mentioned in:
https://spark.apache.org/docs/0.9.0/job-scheduling.html

val a = sc.textFile("hdfs:\.")
val b =a.sometransformations

Thread1 = {
 /* An action on RDD b */
}

Thread2 = {
/* An action on RDD b */
}

and so on. Basically, different actions are performed on same RDD. Now, I
just want to know if writing code like this with threads makes any
difference in execution when there are no threads written in the code ( It
means all actions are submitted via one thread, I guess). Before that, is
this a way to submits actions via multiple threads?

Thanks




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Job-submission-via-multiple-threads-tp21825.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Which one is faster / consumes less memory: collect() or count()?

2015-02-26 Thread Sean Owen
Yea we discussed this on the list a short while ago. The extra
overhead of count() is pretty minimal. Still you could wrap this up as
a utility method. There was even a proposal to add some 'materialize'
method to RDD.

PS you can make your Java a little less verbose by omitting "throws
Exception" and "return;"

On Thu, Feb 26, 2015 at 3:07 PM, Emre Sevinc  wrote:
> Hello Sean,
>
> Thank you for your advice. Based on your suggestion, I've modified the code
> into the following (and once again admired the easy (!) verbosity of Java
> compared to 'complex and hard to understand' brevity (!) of Scala):
>
> javaDStream.foreachRDD(
> new Function, Void>() {
>   @Override
>   public Void call(JavaRDD stringJavaRDD) throws
> Exception {
> stringJavaRDD.foreachPartition(
> new VoidFunction>() {
>   @Override
>   public void call(Iterator iteratorString)
> {
> return;
>   }
> }
> );
>
> return null;
>   }
> });
>
>
> I've tested the above in my application, and also observed it with Visual VM
> but could not see a dramatic speed difference (and small heap usage
> difference) compared to my initial version where I just use .count() in a
> foreachRDD block.
>
> Nevertheless I'll make more experiments to see if differences come up in
> terms of speed/memory.
>
> Kind regards,
>
> Emre Sevinç
> http://www.bigindustries.be/
>
>
>
>
>
> On Thu, Feb 26, 2015 at 2:34 PM, Sean Owen  wrote:
>>
>> Those do quite different things. One counts the data; the other copies
>> all of the data to the driver.
>>
>> The fastest way to materialize an RDD that I know of is
>> foreachPartition(i => None)  (or equivalent no-op VoidFunction in
>> Java)
>>
>> On Thu, Feb 26, 2015 at 1:28 PM, Emre Sevinc 
>> wrote:
>> > Hello,
>> >
>> > I have a piece of code to force the materialization of RDDs in my Spark
>> > Streaming program, and I'm trying to understand which method is faster
>> > and
>> > has less memory consumption:
>> >
>> >   javaDStream.foreachRDD(new Function, Void>() {
>> >   @Override
>> >   public Void call(JavaRDD stringJavaRDD) throws Exception {
>> >
>> > //stringJavaRDD.collect();
>> >
>> >// or count?
>> >
>> > //stringJavaRDD.count();
>> >
>> > return null;
>> >   }
>> > });
>> >
>> >
>> > I've checked the source code of Spark at
>> >
>> > https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala,
>> > and see that collect() is defined as:
>> >
>> >   def collect(): Array[T] = {
>> > val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
>> >Array.concat(results: _*)
>> >   }
>> >
>> > and count() defined as:
>> >
>> >   def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
>> >
>> > Therefore I think calling the count() method is faster and/or consumes
>> > less
>> > memory, but I wanted to be sure.
>> >
>> > Anyone cares to comment?
>> >
>> >
>> > --
>> > Emre Sevinç
>> > http://www.bigindustries.be/
>> >
>
>
>
>
> --
> Emre Sevinc

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Which one is faster / consumes less memory: collect() or count()?

2015-02-26 Thread Emre Sevinc
On Thu, Feb 26, 2015 at 4:20 PM, Sean Owen  wrote:

> Yea we discussed this on the list a short while ago. The extra
> overhead of count() is pretty minimal. Still you could wrap this up as
> a utility method. There was even a proposal to add some 'materialize'
> method to RDD.
>

I definitely would like to vote up for that proposal.

--
Emre


Re: Facing error while extending scala class with Product interface to overcome limit of 22 fields in spark-shell

2015-02-26 Thread Patrick Varilly
By the way, the limitation of case classes to 22 parameters was removed in
 Scala 2.11
 (there's some technical
rough edge  past 22 that you most
likely will never run into, but past 255, you run into underlying
limitations of the JVM ).


Best,

Patrick

On Thu, Feb 26, 2015 at 11:58 AM, anamika gupta 
wrote:

> Hi Patrick
>
> Thanks a ton for your in-depth answer. The compilation error is now
> resolved.
>
> Thanks a lot again !!
>
> On Thu, Feb 26, 2015 at 2:40 PM, Patrick Varilly <
> patrick.vari...@dataminded.be> wrote:
>
>> Hi, Akhil,
>>
>> In your definition of sdp_d
>> ,
>> all your fields are of type Option[X].  In Scala, a value of type Option[X]
>> can hold one of two things:
>>
>> 1. None
>> 2. Some(x), where x is of type X
>>
>> So to fix your immediate problem, wrap all your parameters to the sdp_d
>> constructor in Some(...), as follows:
>>
>>new sdp_d(Some(r(0).trim.toInt), Some(r(1).trim.toInt),
>> Some(r(2).trim), ...
>>
>> Your earlier question of why writing sdp_d(...) for a case class works
>> but you need to write new sdp_d(...) for an explicit class, there's a
>> simple answer.  When you create a case class X in scala, Scala also makes a
>> companion object X behind the scenes with an apply method that calls new
>> (see below).  Scala's rules will call this apply method automatically.  So,
>> when you write "X(...)", you're really calling "X.apply(...)" which in turn
>> calls "new X(...)".  (This is the same trick behind writing things like
>> List(1,2,3))  If you don't use a case class, you'd have to make the
>> companion object yourself explicitly.
>>
>> For reference, this statement:
>>
>>case class X(a: A, b: B)
>>
>> is conceptually equivalent to
>>
>>class X(val a: A, val b: B) extends ... {
>>
>>   override def toString: String = // Auto-generated
>>   override def hashCode: Int = // Auto-generated
>>   override def equals(that: Any): Boolean = // Auto-generated
>>
>>   ... more convenience methods ...
>>}
>>
>>object X {
>>   def apply(a: A, b: B) = new X(a, b)
>>   ... more convenience methods ...
>>}
>>
>> If you want to peek under the hood, try compiling a simple X.scala file
>> with the line "case class X(a: Int, b: Double)", then taking apart the
>> generated X.class and X$.class (e.g., "javap X.class").
>>
>> More info here
>> , here
>>  and in Programming
>> in Scala  ch 15.
>>
>> Hope that helps!
>>
>> Best,
>>
>> Patrick
>>
>> On Thu, Feb 26, 2015 at 6:37 AM, anamika gupta 
>> wrote:
>>
>>> I am now getting the following error. I cross-checked my types and
>>> corrected three of them i.e. r26-->String, r27-->Timestamp,
>>> r28-->Timestamp. This error still persists.
>>>
>>> scala>
>>> sc.textFile("/home/cdhuser/Desktop/Sdp_d.csv").map(_.split(",")).map { r =>
>>>  | val upto_time = sdf.parse(r(23).trim);
>>>  | calendar.setTime(upto_time);
>>>  | val r23 = new java.sql.Timestamp(upto_time.getTime)
>>>  | val insert_time = sdf.parse(r(27).trim)
>>>  | calendar.setTime(insert_time)
>>>  | val r27 = new java.sql.Timestamp(insert_time.getTime)
>>>  | val last_upd_time = sdf.parse(r(28).trim)
>>>  | calendar.setTime(last_upd_time)
>>>  | val r28 = new java.sql.Timestamp(last_upd_time.getTime)
>>>  | new sdp_d(r(0).trim.toInt, r(1).trim.toInt, r(2).trim,
>>> r(3).trim.toInt, r(4).trim.toInt, r(5).trim, r(6).trim.toInt, r(7).trim,
>>> r(8).trim.toDouble, r(9).trim.toDouble, r(10).trim, r(11).trim, r(12).trim,
>>> r(13).trim, r(14).trim, r(15).trim, r(16).trim, r(17).trim, r(18).trim,
>>> r(19).trim, r(20).trim, r(21).trim.toInt, r(22).trim, r23, r(24).trim,
>>> r(25).trim, r(26).trim, r27, r28)
>>>  | }.registerAsTable("sdp_d")
>>>
>>> :26: error: type mismatch;
>>>  found   : Int
>>>  required: Option[Int]
>>>   new sdp_d(r(0).trim.toInt, r(1).trim.toInt, r(2).trim,
>>> r(3).trim.toInt, r(4).trim.toInt, r(5).trim, r(6).trim.toInt, r(7).trim,
>>> r(8).trim.toDouble, r(9).trim.toDouble, r(10).trim, r(11).trim, r(12).trim,
>>> r(13).trim, r(14).trim, r(15).trim, r(16).trim, r(17).trim, r(18).trim,
>>> r(19).trim, r(20).trim, r(21).trim.toInt, r(22).trim, r23, r(24).trim,
>>> r(25).trim, r(26).trim, r27, r28)
>>>
>>> On Wed, Feb 25, 2015 at 2:32 PM, Akhil Das 
>>> wrote:
>>>
 It says sdp_d not found, since it is a class you need to instantiate it
 once. like:

 sc.textFile("derby.log").map(_.split(",")).map( r => {
   val upto_time = sdf.parse(r(23).trim);
   calendar

Re: different result from implicit ALS with explicit ALS

2015-02-26 Thread 163
Thank you very much for your opinion:)

In our case, maybe it 's dangerous to treat un-observed item as negative 
interaction(although we could give them small confidence, I think they are 
still incredible...)

I will do more experiments and give you feedback:)

Thank you;)


> 在 2015年2月26日,23:16,Sean Owen  写道:
> 
> I believe that's right, and is what I was getting at. yes the implicit
> formulation ends up implicitly including every possible interaction in
> its loss function, even unobserved ones. That could be the difference.
> 
> This is mostly an academic question though. In practice, you have
> click-like data and should be using the implicit version for sure.
> 
> However you can give negative implicit feedback to the model. You
> could consider no-click as a mild, observed, negative interaction.
> That is: supply a small negative value for these cases. Unobserved
> pairs are not part of the data set. I'd be careful about assuming the
> lack of an action carries signal.
> 
>> On Thu, Feb 26, 2015 at 3:07 PM, 163  wrote:
>> oh my god, I think I understood...
>> In my case, there are three kinds of user-item pairs:
>> 
>> Display and click pair(positive pair)
>> Display but no-click pair(negative pair)
>> No-display pair(unobserved pair)
>> 
>> Explicit ALS only consider the first and the second kinds
>> But implicit ALS consider all the three kinds of pair(and consider the third
>> kind as the second pair, because their preference value are all zero and
>> confidence are all 1)
>> 
>> So the result are different. right?
>> 
>> Could you please give me some advice, which ALS should I use?
>> If I use the implicit ALS, how to distinguish the second and the third kind
>> of pair:)
>> 
>> My opinion is in my case, I should use explicit ALS ...
>> 
>> Thank you so much
>> 
>> 在 2015年2月26日,22:41,Xiangrui Meng  写道:
>> 
>> Lisen, did you use all m-by-n pairs during training? Implicit model
>> penalizes unobserved ratings, while explicit model doesn't. -Xiangrui
>> 
>>> On Feb 26, 2015 6:26 AM, "Sean Owen"  wrote:
>>> 
>>> +user
>>> 
 On Thu, Feb 26, 2015 at 2:26 PM, Sean Owen  wrote:
 
 I think I may have it backwards, and that you are correct to keep the 0
 elements in train() in order to try to reproduce the same result.
 
 The second formulation is called 'weighted regularization' and is used
 for both implicit and explicit feedback, as far as I can see in the code.
 
 Hm, I'm actually not clear why these would produce different results.
 Different code paths are used to be sure, but I'm not yet sure why they
 would give different results.
 
 In general you wouldn't use train() for data like this though, and would
 never set alpha=0.
 
> On Thu, Feb 26, 2015 at 2:15 PM, lisendong  wrote:
> 
> I want to confirm the loss function you use (sorry I’m not so familiar
> with scala code so I did not understand the source code of mllib)
> 
> According to the papers :
> 
> 
> in your implicit feedback ALS, the loss function is (ICDM 2008):
> 
> in the explicit feedback ALS, the loss function is (Netflix 2008):
> 
> note that besides the difference of confidence parameter Cui, the
> regularization is also different.  does your code also has this 
> difference?
> 
> Best Regards,
> Sendong Li
> 
> 
>> 在 2015年2月26日,下午9:42,lisendong  写道:
>> 
>> Hi meng, fotero, sowen:
>> 
>> I’m using ALS with spark 1.0.0, the code should be:
>> 
>> https://github.com/apache/spark/blob/branch-1.0/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
>> 
>> I think the following two method should produce the same (or near)
>> result:
>> 
>> MatrixFactorizationModel model = ALS.train(ratings.rdd(), 30, 30, 0.01,
>> -1, 1);
>> 
>> MatrixFactorizationModel model = ALS.trainImplicit(ratings.rdd(), 30,
>> 30, 0.01, -1, 0, 1);
>> 
>> the data I used is display log, the format of log is as following:
>> 
>> user  item  if-click
>> 
>> 
>> 
>> 
>> 
>> 
>> I use 1.0 as score for click pair, and 0 as score for non-click pair.
>> 
>> in the second method, the alpha is set to zero, so the confidence for
>> positive and negative are both 1.0 (right?)
>> 
>> I think the two method should produce similar result, but the result is
>> :  the second method’s result is very bad (the AUC of the first result is
>> 0.7, but the AUC of the second result is only 0.61)
>> 
>> 
>> I could not understand why, could you help me?
>> 
>> 
>> Thank you very much!
>> 
>> Best Regards,
>> Sendong Li
> 
> 
 
>>> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Considering Spark for large data elements

2015-02-26 Thread Jeffrey Jedele
Hi Rob,
I fear your questions will be hard to answer without additional information
about what kind of simulations you plan to do. int[r][c] basically means
you have a matrix of integers? You could for example map this to a
row-oriented RDD of integer-arrays or to a column oriented RDD of integer
arrays. What the better option is will heavily depend on your workload.
Also have a look at the algebaraic data-structures that come with mllib (
https://spark.apache.org/docs/1.1.0/api/scala/index.html#org.apache.spark.mllib.linalg.Vectors
).

Regards,
Jeff

2015-02-25 23:58 GMT+01:00 Rob Sargent :

>  I have an application which might benefit from Sparks
> distribution/analysis, but I'm worried about the size and structure of my
> data set.  I need to perform several thousand simulation on a rather large
> data set and I need access to all the generated simulations.  The data
> element is largely in int[r][c] where r is 100 to 1000 and c is 20-80K
> (there's more but that array is the bulk of the problem.  I have machines
> and memory capable of doing 6-10 simulations simultaneously in separate
> jvms.  Is this data structure compatible with Sparks RDD notion?
>
> If yes, I will have a slough of how-to-get-started questions, the first of
> which is how to seed the run?  My thinking is to use
> org.apache.spark.api.java.FlatMapFunction starting with an EmptyRDD and
> the seed data.  Would that be the way to go?
>
> Thanks
>


Re: Creating hive table on spark ((ERROR))

2015-02-26 Thread sandeep vura
Oh Thanks for the clarification,I will try to downgrade hive.

On Thu, Feb 26, 2015 at 9:44 PM, Cheng Lian  wrote:

>  You are using a Hive version which is not support by Spark SQL. Spark SQL
> 1.1.x and prior versions only support Hive 0.12.0. Spark SQL 1.2.0 supports
> Hive 0.12.0 or Hive 0.13.1.
>
>
> On 2/27/15 12:12 AM, sandeep vura wrote:
>
> Hi Cheng,
>
>  Thanks the above issue has been resolved.I have configured Remote
> metastore not Local metastore in Hive.
>
>  While creating a table in sparksql another error reflecting on terminal
> . Below error is given below
>
>  sqlContext.sql("LOAD DATA LOCAL INPATH
> '/home/spark12/sandeep_data/sales_pg.csv' INTO TABLE sandeep_test")
> 15/02/26 21:49:24 ERROR Driver: FAILED: RuntimeException
> org.apache.hadoop.ipc.RemoteException: Server IPC version 9 cannot
> communicate with client version 4
> java.lang.RuntimeException: org.apache.hadoop.ipc.RemoteException: Server
> IPC version 9 cannot communicate with client version 4
> at org.apache.hadoop.hive.ql.Context.getScratchDir(Context.java:222)
> at
> org.apache.hadoop.hive.ql.Context.getExternalScratchDir(Context.java:278)
> at
> org.apache.hadoop.hive.ql.Context.getExternalTmpPath(Context.java:344)
> at
> org.apache.hadoop.hive.ql.parse.LoadSemanticAnalyzer.analyzeInternal(LoadSemanticAnalyzer.java:243)
> at
> org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.analyze(BaseSemanticAnalyzer.java:327)
> at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:422)
> at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:322)
> at org.apache.hadoop.hive.ql.Driver.compileInternal(Driver.java:975)
> at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1040)
> at org.apache.hadoop.hive.ql.Driver.run(Driver.java:911)
> at org.apache.hadoop.hive.ql.Driver.run(Driver.java:901)
> at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:305)
> at
> org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:276)
> at
> org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult$lzycompute(NativeCommand.scala:35)
> at
> org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult(NativeCommand.scala:35)
> at
> org.apache.spark.sql.execution.Command$class.execute(commands.scala:46)
> at
> org.apache.spark.sql.hive.execution.NativeCommand.execute(NativeCommand.scala:30)
> at
> org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425)
> at
> org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425)
> at
> org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58)
> at org.apache.spark.sql.SchemaRDD.(SchemaRDD.scala:108)
> at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:94)
> at $line13.$read$$iwC$$iwC$$iwC$$iwC.(:15)
> at $line13.$read$$iwC$$iwC$$iwC.(:20)
> at $line13.$read$$iwC$$iwC.(:22)
> at $line13.$read$$iwC.(:24)
> at $line13.$read.(:26)
> at $line13.$read$.(:30)
> at $line13.$read$.()
> at $line13.$eval$.(:7)
> at $line13.$eval$.()
> at $line13.$eval.$print()
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:622)
> at
> org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)
> at
> org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)
> at
> org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)
> at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705)
> at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669)
> at
> org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:828)
> at
> org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:873)
> at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:785)
> at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:628)
> at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:636)
> at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:641)
> at
> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:968)
> at
> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916)
> at
> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916)
> at
> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
> at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916)
> at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1011)
> at org.apache.spark.repl.Main$.main(Main.scala:31)
> at org.apache.spark.repl.Main.main(Main.scala)
> at sun.reflect.NativeMethodAcc

Re: Creating hive table on spark ((ERROR))

2015-02-26 Thread Cheng Lian
You are using a Hive version which is not support by Spark SQL. Spark 
SQL 1.1.x and prior versions only support Hive 0.12.0. Spark SQL 1.2.0 
supports Hive 0.12.0 or Hive 0.13.1.


On 2/27/15 12:12 AM, sandeep vura wrote:

Hi Cheng,

Thanks the above issue has been resolved.I have configured Remote 
metastore not Local metastore in Hive.


While creating a table in sparksql another error reflecting on 
terminal . Below error is given below


sqlContext.sql("LOAD DATA LOCAL INPATH 
'/home/spark12/sandeep_data/sales_pg.csv' INTO TABLE sandeep_test")
15/02/26 21:49:24 ERROR Driver: FAILED: RuntimeException 
org.apache.hadoop.ipc.RemoteException: Server IPC version 9 cannot 
communicate with client version 4
java.lang.RuntimeException: org.apache.hadoop.ipc.RemoteException: 
Server IPC version 9 cannot communicate with client version 4

at org.apache.hadoop.hive.ql.Context.getScratchDir(Context.java:222)
at 
org.apache.hadoop.hive.ql.Context.getExternalScratchDir(Context.java:278)
at 
org.apache.hadoop.hive.ql.Context.getExternalTmpPath(Context.java:344)
at 
org.apache.hadoop.hive.ql.parse.LoadSemanticAnalyzer.analyzeInternal(LoadSemanticAnalyzer.java:243)
at 
org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.analyze(BaseSemanticAnalyzer.java:327)

at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:422)
at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:322)
at org.apache.hadoop.hive.ql.Driver.compileInternal(Driver.java:975)
at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1040)
at org.apache.hadoop.hive.ql.Driver.run(Driver.java:911)
at org.apache.hadoop.hive.ql.Driver.run(Driver.java:901)
at 
org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:305)
at 
org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:276)
at 
org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult$lzycompute(NativeCommand.scala:35)
at 
org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult(NativeCommand.scala:35)
at 
org.apache.spark.sql.execution.Command$class.execute(commands.scala:46)
at 
org.apache.spark.sql.hive.execution.NativeCommand.execute(NativeCommand.scala:30)
at 
org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425)
at 
org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425)
at 
org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58)

at org.apache.spark.sql.SchemaRDD.(SchemaRDD.scala:108)
at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:94)
at $line13.$read$$iwC$$iwC$$iwC$$iwC.(:15)
at $line13.$read$$iwC$$iwC$$iwC.(:20)
at $line13.$read$$iwC$$iwC.(:22)
at $line13.$read$$iwC.(:24)
at $line13.$read.(:26)
at $line13.$read$.(:30)
at $line13.$read$.()
at $line13.$eval$.(:7)
at $line13.$eval$.()
at $line13.$eval.$print()
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:622)
at 
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)
at 
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)
at 
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)

at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669)
at 
org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:828)
at 
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:873)

at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:785)
at 
org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:628)

at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:636)
at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:641)
at 
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:968)
at 
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916)
at 
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916)
at 
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)

at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1011)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:622)
   

custom inputformat serializable problem

2015-02-26 Thread patcharee

Hi,

I am using custom inputformat and recordreader. This custom recordreader 
has declaration:
public class NetCDFRecordReader extends RecordReaderWRFVariableText>


The WRFVariableText extends Text:
public class WRFVariableText extends org.apache.hadoop.io.Text

The WRFVariableText overrides readFields(DataInput in) and 
write(DataOutput out) method. I understand that this WRFVariableText 
already implements serialization. But I got an exception about 
serialization when I ran my job using the custom inputformat and 
recordreader>>>


Exception in thread "main" org.apache.spark.SparkException: Job aborted 
due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not 
serializable result: no.uni.computing.io.WRFVariableText


Any ideas?

Best,
Patcharee

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: custom inputformat serializable problem

2015-02-26 Thread Sean Owen
If WRFVariableText is a Text, then it implements Writable, not Java
serialization. You can implement Serializable in your class, or
consider reusing SerializableWritable in Spark (note it's a developer
API).

On Thu, Feb 26, 2015 at 4:03 PM, patcharee  wrote:
> Hi,
>
> I am using custom inputformat and recordreader. This custom recordreader has
> declaration:
> public class NetCDFRecordReader extends RecordReader WRFVariableText>
>
> The WRFVariableText extends Text:
> public class WRFVariableText extends org.apache.hadoop.io.Text
>
> The WRFVariableText overrides readFields(DataInput in) and write(DataOutput
> out) method. I understand that this WRFVariableText already implements
> serialization. But I got an exception about serialization when I ran my job
> using the custom inputformat and recordreader>>>
>
> Exception in thread "main" org.apache.spark.SparkException: Job aborted due
> to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable
> result: no.uni.computing.io.WRFVariableText
>
> Any ideas?
>
> Best,
> Patcharee
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Cartesian issue with user defined objects

2015-02-26 Thread mrk91
Hello,

I have an issue with the cartesian method. When I use it with the Java types
everything is ok, but when I use it with RDD made of objects defined by me
it has very strage behaviors which depends on whether the RDD is cached or
not (you can see  here

  
what happens).

Is this due to a bug in its implementation or are there any requirements for
the objects to be passed to it?
Thanks.
Best regards.
Marco



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Cartesian-issue-with-user-defined-objects-tp21826.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

RE: Help me understand the partition, parallelism in Spark

2015-02-26 Thread java8964
Anyone can share any thoughts related to my questions?
Thanks

From: java8...@hotmail.com
To: user@spark.apache.org
Subject: Help me understand the partition, parallelism in Spark
Date: Wed, 25 Feb 2015 21:58:55 -0500




Hi, Sparkers:
I come from the Hadoop MapReducer world, and try to understand some internal 
information of spark. From the web and this list, I keep seeing people talking 
about increase the parallelism if you get the OOM error. I tried to read 
document as much as possible to understand the RDD partition, and parallelism 
usage in the spark.
I understand that for RDD from HDFS, by default, one partition will be one HDFS 
block, pretty straightforward. I saw that lots of RDD operations support 2nd 
parameter of parallelism. This is the part confuse me. From my understand, the 
parallelism is totally controlled by how many cores you give to your job. 
Adjust that parameter, or "spark.default.parallelism" shouldn't have any impact.
For example, if I have a 10G data in HDFS, and assume the block size is 128M, 
so we get 100 blocks/partitions in RDD. Now if I transfer that RDD to a Pair 
RDD, with 1000 unique keys in the pair RDD, and doing reduceByKey action, using 
200 as the default parallelism. Here is what I assume:
We have 100 partitions, as the data comes from 100 blocks. Most likely the 
spark will generate 100 tasks to read and shuffle them?The 1000 unique keys 
mean the 1000 reducer group, like in MRIf I set the max core to be 50, so there 
will be up to 50 tasks can be run concurrently. The rest tasks just have to 
wait for the core, if there are 50 tasks are running.Since we are doing 
reduceByKey, shuffling will happen. Data will be shuffled into 1000 partitions, 
as we have 1000 unique keys.I don't know these 1000 partitions will be 
processed by how many tasks, maybe this is the parallelism parameter comes 
in?No matter what parallelism this will be, there are ONLY 50 task can be run 
concurrently. So if we set more cores, more partitions' data will be processed 
in the executor (which runs more thread in this case), so more memory needs. I 
don't see how increasing parallelism could help the OOM in this case.In my test 
case of Spark SQL, I gave 24G as the executor heap, my join between 2 big 
datasets keeps getting OOM. I keep increasing the "spark.default.parallelism", 
from 200 to 400, to 2000, even to 4000, no help. What really makes the query 
finish finally without OOM is after I change the "--total-executor-cores" from 
10 to 4.
So my questions are:1) What is the parallelism really mean in the Spark? In the 
simple example above, for reduceByKey, what difference it is between 
parallelism change from 10 to 20?2) When we talk about partition in the spark, 
for the data coming from HDFS, I can understand the partition clearly. For the 
intermediate data, the partition will be same as key, right? For group, 
reducing, join action, uniqueness of the keys will be partition. Is that 
correct?3) Why increasing parallelism could help OOM? I don't get this part. 
From my limited experience, adjusting the core count really matters for memory.
Thanks
Yong
  

Re: Creating hive table on spark ((ERROR))

2015-02-26 Thread sandeep vura
Hi Cheng,

Thanks the above issue has been resolved.I have configured Remote metastore
not Local metastore in Hive.

While creating a table in sparksql another error reflecting on terminal .
Below error is given below

sqlContext.sql("LOAD DATA LOCAL INPATH
'/home/spark12/sandeep_data/sales_pg.csv' INTO TABLE sandeep_test")
15/02/26 21:49:24 ERROR Driver: FAILED: RuntimeException
org.apache.hadoop.ipc.RemoteException: Server IPC version 9 cannot
communicate with client version 4
java.lang.RuntimeException: org.apache.hadoop.ipc.RemoteException: Server
IPC version 9 cannot communicate with client version 4
at org.apache.hadoop.hive.ql.Context.getScratchDir(Context.java:222)
at
org.apache.hadoop.hive.ql.Context.getExternalScratchDir(Context.java:278)
at
org.apache.hadoop.hive.ql.Context.getExternalTmpPath(Context.java:344)
at
org.apache.hadoop.hive.ql.parse.LoadSemanticAnalyzer.analyzeInternal(LoadSemanticAnalyzer.java:243)
at
org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.analyze(BaseSemanticAnalyzer.java:327)
at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:422)
at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:322)
at org.apache.hadoop.hive.ql.Driver.compileInternal(Driver.java:975)
at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1040)
at org.apache.hadoop.hive.ql.Driver.run(Driver.java:911)
at org.apache.hadoop.hive.ql.Driver.run(Driver.java:901)
at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:305)
at
org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:276)
at
org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult$lzycompute(NativeCommand.scala:35)
at
org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult(NativeCommand.scala:35)
at
org.apache.spark.sql.execution.Command$class.execute(commands.scala:46)
at
org.apache.spark.sql.hive.execution.NativeCommand.execute(NativeCommand.scala:30)
at
org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425)
at
org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425)
at
org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58)
at org.apache.spark.sql.SchemaRDD.(SchemaRDD.scala:108)
at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:94)
at $line13.$read$$iwC$$iwC$$iwC$$iwC.(:15)
at $line13.$read$$iwC$$iwC$$iwC.(:20)
at $line13.$read$$iwC$$iwC.(:22)
at $line13.$read$$iwC.(:24)
at $line13.$read.(:26)
at $line13.$read$.(:30)
at $line13.$read$.()
at $line13.$eval$.(:7)
at $line13.$eval$.()
at $line13.$eval.$print()
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:622)
at
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)
at
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)
at
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669)
at
org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:828)
at
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:873)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:785)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:628)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:636)
at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:641)
at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:968)
at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916)
at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916)
at
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1011)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:622)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.hadoop.ipc.RemoteException: Server IP

Integrating Spark Streaming with Reactive Mongo

2015-02-26 Thread Mike Trienis
Hi All,

I have Spark Streaming setup to write data to a replicated MongoDB database
and would like to understand if there would be any issues using the Reactive
Mongo library to write directly to the mongoDB? My stack is Apache Spark
sitting on top of Cassandra for the datastore, so my thinking is that the
MongoDB connector for Hadoop will not be particular useful for me since I'm
not using HDFS? Is there anything that I'm missing?  

Here is an example of code that I'm planning on using as a starting point
for my implementation. 

LogAggregator

  

Thanks, Mike. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Integrating-Spark-Streaming-with-Reactive-Mongo-tp21828.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to get yarn logs to display in the spark or yarn history-server?

2015-02-26 Thread Christophe Préaud
You can see this information in the yarn web UI using the configuration I 
provided in my former mail (click on the application id, then on logs; you will 
then be automatically redirected to the yarn history server UI).

On 24/02/2015 19:49, Colin Kincaid Williams wrote:
So back to my original question.

I can see the spark logs using the example above:

yarn logs -applicationId application_1424740955620_0009

This shows yarn log aggregation working. I can see the std out and std error in 
that container information above. Then how can I get this information in a 
web-ui ? Is this not currently supported?

On Tue, Feb 24, 2015 at 10:44 AM, Imran Rashid 
mailto:iras...@cloudera.com>> wrote:
the spark history server and the yarn history server are totally independent.  
Spark knows nothing about yarn logs, and vice versa, so unfortunately there 
isn't any way to get all the info in one place.

On Tue, Feb 24, 2015 at 12:36 PM, Colin Kincaid Williams 
mailto:disc...@uw.edu>> wrote:
Looks like in my tired state, I didn't mention spark the whole time. However, 
it might be implied by the application log above. Spark log aggregation appears 
to be working, since I can run the yarn command above. I do have yarn logging 
setup for the yarn history server. I was trying to use the spark 
history-server, but maybe I should try setting

spark.yarn.historyServer.address

to the yarn history-server, instead of the spark history-server? I tried this 
configuration when I started, but didn't have much luck.

Are you getting your spark apps run in yarn client or cluster mode in your yarn 
history server? If so can you share any spark settings?

On Tue, Feb 24, 2015 at 8:48 AM, Christophe Préaud 
mailto:christophe.pre...@kelkoo.com>> wrote:
Hi Colin,

Here is how I have configured my hadoop cluster to have yarn logs available 
through both the yarn CLI and the _yarn_ history server (with gzip compression 
and 10 days retention):

1. Add the following properties in the yarn-site.xml on each node managers and 
on the resource manager:
  
yarn.log-aggregation-enable
true
  
  
yarn.log-aggregation.retain-seconds
864000
  
  
yarn.log.server.url

http://dc1-kdp-dev-hadoop-03.dev.dc1.kelkoo.net:19888/jobhistory/logs
  
  
yarn.nodemanager.log-aggregation.compression-type
gz
  

2. Restart yarn and then start the yarn history server on the server defined in 
the yarn.log.server.url property above:

/opt/hadoop/sbin/mr-jobhistory-daemon.sh stop historyserver # should fail if 
historyserver is not yet started
/opt/hadoop/sbin/stop-yarn.sh
/opt/hadoop/sbin/start-yarn.sh
/opt/hadoop/sbin/mr-jobhistory-daemon.sh start historyserver


It may be slightly different for you if the resource manager and the history 
server are not on the same machine.

Hope it will work for you as well!
Christophe.

On 24/02/2015 06:31, Colin Kincaid Williams wrote:
> Hi,
>
> I have been trying to get my yarn logs to display in the spark history-server 
> or yarn history-server. I can see the log information
>
>
> yarn logs -applicationId application_1424740955620_0009
> 15/02/23 22:15:14 INFO client.ConfiguredRMFailoverProxyProvider: Failing over 
> to us3sm2hbqa04r07-comp-prod-local
>
>
> Container: container_1424740955620_0009_01_02 on 
> us3sm2hbqa07r07.comp.prod.local_8041
> ===
> LogType: stderr
> LogLength: 0
> Log Contents:
>
> LogType: stdout
> LogLength: 897
> Log Contents:
> [GC [PSYoungGen: 262656K->23808K(306176K)] 262656K->23880K(1005568K), 
> 0.0283450 secs] [Times: user=0.14 sys=0.03, real=0.03 secs]
> Heap
>  PSYoungGen  total 306176K, used 111279K [0xeaa8, 
> 0x0001, 0x0001)
>   eden space 262656K, 33% used 
> [0xeaa8,0xeffebbe0,0xfab0)
>   from space 43520K, 54% used 
> [0xfab0,0xfc240320,0xfd58)
>   to   space 43520K, 0% used 
> [0xfd58,0xfd58,0x0001)
>  ParOldGen   total 699392K, used 72K [0xbff8, 
> 0xeaa8, 0xeaa8)
>   object space 699392K, 0% used 
> [0xbff8,0xbff92010,0xeaa8)
>  PSPermGen   total 35328K, used 34892K [0xbad8, 
> 0xbd00, 0xbff8)
>   object space 35328K, 98% used 
> [0xbad8,0xbcf93088,0xbd00)
>
>
>
> Container: container_1424740955620_0009_01_03 on 
> us3sm2hbqa09r09.comp.prod.local_8041
> ===
> LogType: stderr
> LogLength: 0
> Log Contents:
>
> LogType: stdout
> LogLength: 896
> Log Contents:
> [GC [PSYoungGen: 262656K->23725K(306176K)] 262656K->23797K(1005568K), 
> 0.0358650 secs] [Times: user=0.28 sys=0.04, real=0.04 secs]
> Heap
>  PSYoungGen  total 306176K, used 65712K [0xeaa8, 
> 0x0001, 0x0

Iterating on RDDs

2015-02-26 Thread Vijayasarathy Kannan
Hi,

I have the following use case.

(1) I have an RDD of edges of a graph (say R).
(2) do a groupBy on R (by say source vertex) and call a function F on each
group.
(3) collect the results from Fs and do some computation
(4) repeat the above steps until some criteria is met

In (2), the groups are always going to be the same (since R is grouped by
source vertex).

Question:
Is R distributed every iteration (when in (2)) or is it distributed only
once when it is created?

A sample code snippet is below.

while(true) {
  val res = R.groupBy[VertexId](G).flatMap(F)
  res.collect.foreach(func)
  if(criteria)
 break
}

Since the groups remain the same, what is the best way to go about
implementing the above logic?


Re: throughput in the web console?

2015-02-26 Thread Saiph Kappa
By setting spark.eventLog.enabled to true it is possible to see the
application UI after the application has finished its execution, however
the Streaming tab is no longer visible.

For measuring the duration of batches in the code I am doing something like
this:
«wordCharValues.foreachRDD(rdd => {
val startTick = System.currentTimeMillis()
val result = rdd.take(1)
val timeDiff = System.currentTimeMillis() - startTick»

But my quesiton is: is it possible to see the rate/throughput (records/sec)
when I have a stream to process log files that appear in a folder?



On Thu, Feb 26, 2015 at 1:36 AM, Tathagata Das  wrote:

> Yes. # tuples processed in a batch = sum of all the tuples received by all
> the receivers.
>
> In screen shot, there was a batch with 69.9K records, and there was a
> batch which took 1 s 473 ms. These two batches can be the same, can be
> different batches.
>
> TD
>
> On Wed, Feb 25, 2015 at 10:11 AM, Josh J  wrote:
>
>> If I'm using the kafka receiver, can I assume the number of records
>> processed in the batch is the sum of the number of records processed by the
>> kafka receiver?
>>
>> So in the screen shot attached the max rate of tuples processed in a
>> batch is 42.7K + 27.2K = 69.9K tuples processed in a batch with a max
>> processing time of 1 second 473 ms?
>>
>> On Wed, Feb 25, 2015 at 8:48 AM, Akhil Das 
>> wrote:
>>
>>> By throughput you mean Number of events processed etc?
>>>
>>> [image: Inline image 1]
>>>
>>> Streaming tab already have these statistics.
>>>
>>>
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Wed, Feb 25, 2015 at 9:59 PM, Josh J  wrote:
>>>

 On Wed, Feb 25, 2015 at 7:54 AM, Akhil Das 
 wrote:

> For SparkStreaming applications, there is already a tab called
> "Streaming" which displays the basic statistics.


 Would I just need to extend this tab to add the throughput?

>>>
>>>
>>
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>
>


Apache Ignite vs Apache Spark

2015-02-26 Thread Ognen Duzlevski
Can someone with experience briefly share or summarize the differences
between Ignite and Spark? Are they complementary? Totally unrelated?
Overlapping? Seems like ignite has reached version 1.0, I have never heard
of it until a few days ago and given what is advertised, it sounds pretty
interesting but I am unsure how this relates to or differs from Spark.

Thanks!
Ognen


spark streaming, batchinterval,windowinterval and window sliding interval difference

2015-02-26 Thread Hafiz Mujadid
Can somebody explain the difference between 
batchinterval,windowinterval and window sliding interval with example.
If there is any real time use case of using these parameters?


Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-batchinterval-windowinterval-and-window-sliding-interval-difference-tp21829.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: throughput in the web console?

2015-02-26 Thread Saiph Kappa
One more question: while processing the exact same batch I noticed that
giving more CPUs to the worker does not decrease the duration of the batch.
I tried this with 4 and 8 CPUs. Though, I noticed that giving only 1 CPU
the duration increased, but apart from that the values were pretty similar,
whether I was using 4 or 6 or 8 CPUs.

On Thu, Feb 26, 2015 at 5:35 PM, Saiph Kappa  wrote:

> By setting spark.eventLog.enabled to true it is possible to see the
> application UI after the application has finished its execution, however
> the Streaming tab is no longer visible.
>
> For measuring the duration of batches in the code I am doing something
> like this:
> «wordCharValues.foreachRDD(rdd => {
> val startTick = System.currentTimeMillis()
> val result = rdd.take(1)
> val timeDiff = System.currentTimeMillis() - startTick»
>
> But my quesiton is: is it possible to see the rate/throughput
> (records/sec) when I have a stream to process log files that appear in a
> folder?
>
>
>
> On Thu, Feb 26, 2015 at 1:36 AM, Tathagata Das 
> wrote:
>
>> Yes. # tuples processed in a batch = sum of all the tuples received by
>> all the receivers.
>>
>> In screen shot, there was a batch with 69.9K records, and there was a
>> batch which took 1 s 473 ms. These two batches can be the same, can be
>> different batches.
>>
>> TD
>>
>> On Wed, Feb 25, 2015 at 10:11 AM, Josh J  wrote:
>>
>>> If I'm using the kafka receiver, can I assume the number of records
>>> processed in the batch is the sum of the number of records processed by the
>>> kafka receiver?
>>>
>>> So in the screen shot attached the max rate of tuples processed in a
>>> batch is 42.7K + 27.2K = 69.9K tuples processed in a batch with a max
>>> processing time of 1 second 473 ms?
>>>
>>> On Wed, Feb 25, 2015 at 8:48 AM, Akhil Das 
>>> wrote:
>>>
 By throughput you mean Number of events processed etc?

 [image: Inline image 1]

 Streaming tab already have these statistics.



 Thanks
 Best Regards

 On Wed, Feb 25, 2015 at 9:59 PM, Josh J  wrote:

>
> On Wed, Feb 25, 2015 at 7:54 AM, Akhil Das  > wrote:
>
>> For SparkStreaming applications, there is already a tab called
>> "Streaming" which displays the basic statistics.
>
>
> Would I just need to extend this tab to add the throughput?
>


>>>
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>
>>
>


Re: Which OutputCommitter to use for S3?

2015-02-26 Thread Thomas Demoor
FYI. We're currently addressing this at the Hadoop level in
https://issues.apache.org/jira/browse/HADOOP-9565


Thomas Demoor

On Mon, Feb 23, 2015 at 10:16 PM, Darin McBeath  wrote:

> Just to close the loop in case anyone runs into the same problem I had.
>
> By setting --hadoop-major-version=2 when using the ec2 scripts, everything
> worked fine.
>
> Darin.
>
>
> - Original Message -
> From: Darin McBeath 
> To: Mingyu Kim ; Aaron Davidson 
> Cc: "user@spark.apache.org" 
> Sent: Monday, February 23, 2015 3:16 PM
> Subject: Re: Which OutputCommitter to use for S3?
>
> Thanks.  I think my problem might actually be the other way around.
>
> I'm compiling with hadoop 2,  but when I startup Spark, using the ec2
> scripts, I don't specify a
> -hadoop-major-version and the default is 1.   I'm guessing that if I make
> that a 2 that it might work correctly.  I'll try it and post a response.
>
>
> - Original Message -
> From: Mingyu Kim 
> To: Darin McBeath ; Aaron Davidson <
> ilike...@gmail.com>
> Cc: "user@spark.apache.org" 
> Sent: Monday, February 23, 2015 3:06 PM
> Subject: Re: Which OutputCommitter to use for S3?
>
> Cool, we will start from there. Thanks Aaron and Josh!
>
> Darin, it¹s likely because the DirectOutputCommitter is compiled with
> Hadoop 1 classes and you¹re running it with Hadoop 2.
> org.apache.hadoop.mapred.JobContext used to be a class in Hadoop 1, and it
> became an interface in Hadoop 2.
>
> Mingyu
>
>
>
>
>
> On 2/23/15, 11:52 AM, "Darin McBeath"  wrote:
>
> >Aaron.  Thanks for the class. Since I'm currently writing Java based
> >Spark applications, I tried converting your class to Java (it seemed
> >pretty straightforward).
> >
> >I set up the use of the class as follows:
> >
> >SparkConf conf = new SparkConf()
> >.set("spark.hadoop.mapred.output.committer.class",
> >"com.elsevier.common.DirectOutputCommitter");
> >
> >And I then try and save a file to S3 (which I believe should use the old
> >hadoop apis).
> >
> >JavaPairRDD newBaselineRDDWritable =
> >reducedhsfPairRDD.mapToPair(new ConvertToWritableTypes());
> >newBaselineRDDWritable.saveAsHadoopFile(baselineOutputBucketFile,
> >Text.class, Text.class, SequenceFileOutputFormat.class,
> >org.apache.hadoop.io.compress.GzipCodec.class);
> >
> >But, I get the following error message.
> >
> >Exception in thread "main" java.lang.IncompatibleClassChangeError: Found
> >class org.apache.hadoop.mapred.JobContext, but interface was expected
> >at
> >com.elsevier.common.DirectOutputCommitter.commitJob(DirectOutputCommitter.
> >java:68)
> >at
> >org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:127)
> >at
> >org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions
> >.scala:1075)
> >at
> >org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.sc
> >ala:940)
> >at
> >org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.sc
> >ala:902)
> >at
> >org.apache.spark.api.java.JavaPairRDD.saveAsHadoopFile(JavaPairRDD.scala:7
> >71)
> >at com.elsevier.spark.SparkSyncDedup.main(SparkSyncDedup.java:156)
> >
> >In my class, JobContext is an interface of  type
> >org.apache.hadoop.mapred.JobContext.
> >
> >Is there something obvious that I might be doing wrong (or messed up in
> >the translation from Scala to Java) or something I should look into?  I'm
> >using Spark 1.2 with hadoop 2.4.
> >
> >
> >Thanks.
> >
> >Darin.
> >
> >
> >
> >
> >
> >From: Aaron Davidson 
> >To: Andrew Ash 
> >Cc: Josh Rosen ; Mingyu Kim ;
> >"user@spark.apache.org" ; Aaron Davidson
> >
> >Sent: Saturday, February 21, 2015 7:01 PM
> >Subject: Re: Which OutputCommitter to use for S3?
> >
> >
> >
> >Here is the class:
> >
> https://urldefense.proofpoint.com/v2/url?u=https-3A__gist.github.com_aaron
> >dav_c513916e72101bbe14ec&d=AwIFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6o
> >Onmz8&r=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84&m=_2YAVrYZtQmuKZRf6sFs
> >zOvl_-ZnxmkBPHo1K24TfGE&s=cwSCPKlJO-BJcz4UcGck3xOE2N-4V3eoNvgtFCdMLP8&e=
> >
> >You can use it by setting "mapred.output.committer.class" in the Hadoop
> >configuration (or "spark.hadoop.mapred.output.committer.class" in the
> >Spark configuration). Note that this only works for the old Hadoop APIs,
> >I believe the new Hadoop APIs strongly tie committer to input format (so
> >FileInputFormat always uses FileOutputCommitter), which makes this fix
> >more difficult to apply.
> >
> >
> >
> >
> >On Sat, Feb 21, 2015 at 12:12 PM, Andrew Ash 
> wrote:
> >
> >Josh is that class something you guys would consider open sourcing, or
> >would you rather the community step up and create an OutputCommitter
> >implementation optimized for S3?
> >>
> >>
> >>On Fri, Feb 20, 2015 at 4:02 PM, Josh Rosen 
> wrote:
> >>
> >>We (Databricks) use our own DirectOutputCommitter implementation, which
> >>is a couple tens of lines of Scala code.  The class would almost
> >>entirely be a no-op except we took some care to properly handle the
> 

Re: Spark excludes "fastutil" dependencies we need

2015-02-26 Thread Marcelo Vanzin
On Wed, Feb 25, 2015 at 8:42 PM, Jim Kleckner  wrote:
> So, should the userClassPathFirst flag work and there is a bug?

Sorry for jumping in the middle of conversation (and probably missing
some of it), but note that this option applies only to executors. If
you're trying to use the class in your driver, there's a separate
option for that.

Also to note is that if you're adding a class that doesn't exist
inside the Spark jars, which seems to be the case, this option should
be irrelevant, since the class loaders should all end up finding the
one copy of the class that you're adding with your app.

-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SparkStreaming failing with exception Could not compute split, block input

2015-02-26 Thread Mukesh Jha
On Wed, Feb 25, 2015 at 8:09 PM, Mukesh Jha  wrote:

> My application runs fine for ~3/4 hours and then hits this issue.
>
> On Wed, Feb 25, 2015 at 11:34 AM, Mukesh Jha 
> wrote:
>
>> Hi Experts,
>>
>> My Spark Job is failing with below error.
>>
>> From the logs I can see that input-3-1424842351600 was added at 5:32:32
>> and was never purged out of memory. Also the available free memory for the
>> executor is *2.1G*.
>>
>> Please help me figure out why executors cannot fetch this input.
>>
>> Txz for any help, Cheers.
>>
>>
>> *Logs*
>> 15/02/25 05:32:32 INFO storage.BlockManagerInfo: Added
>> input-3-1424842351600 in memory on
>> chsnmphbase31.usdc2.oraclecloud.com:50208 (size: 276.1 KB, free: 2.1 GB)
>> .
>> .
>> 15/02/25 05:32:43 INFO storage.BlockManagerInfo: Added
>> input-1-1424842362600 in memory on chsnmphbase30.usdc2.cloud.com:35919
>> (size: 232.3 KB, free: 2.1 GB)
>> 15/02/25 05:32:43 INFO storage.BlockManagerInfo: Added
>> input-4-1424842363000 in memory on chsnmphbase23.usdc2.cloud.com:37751
>> (size: 291.4 KB, free: 2.1 GB)
>> 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 32.1 in
>> stage 451.0 (TID 22511, chsnmphbase19.usdc2.cloud.com, RACK_LOCAL, 1288
>> bytes)
>> 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 37.1 in
>> stage 451.0 (TID 22512, chsnmphbase23.usdc2.cloud.com, RACK_LOCAL, 1288
>> bytes)
>> 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 31.1 in
>> stage 451.0 (TID 22513, chsnmphbase30.usdc2.cloud.com, RACK_LOCAL, 1288
>> bytes)
>> 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 34.1 in
>> stage 451.0 (TID 22514, chsnmphbase26.usdc2.cloud.com, RACK_LOCAL, 1288
>> bytes)
>> 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 36.1 in
>> stage 451.0 (TID 22515, chsnmphbase19.usdc2.cloud.com, RACK_LOCAL, 1288
>> bytes)
>> 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 39.1 in
>> stage 451.0 (TID 22516, chsnmphbase23.usdc2.cloud.com, RACK_LOCAL, 1288
>> bytes)
>> 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 30.1 in
>> stage 451.0 (TID 22517, chsnmphbase30.usdc2.cloud.com, RACK_LOCAL, 1288
>> bytes)
>> 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 33.1 in
>> stage 451.0 (TID 22518, chsnmphbase26.usdc2.cloud.com, RACK_LOCAL, 1288
>> bytes)
>> 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 35.1 in
>> stage 451.0 (TID 22519, chsnmphbase19.usdc2.cloud.com, RACK_LOCAL, 1288
>> bytes)
>> 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 38.1 in
>> stage 451.0 (TID 22520, chsnmphbase23.usdc2.cloud.com, RACK_LOCAL, 1288
>> bytes)
>> 15/02/25 05:32:43 WARN scheduler.TaskSetManager: Lost task 32.1 in stage
>> 451.0 (TID 22511, chsnmphbase19.usdc2.cloud.com): java.lang.Exception:
>> Could not compute split, block input-3-1424842351600 not found
>> at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>> at
>> org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>> at
>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>> at org.apache.spark.scheduler.Task.run(Task.scala:56)
>> at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> 15/02/25 05:32:43 WARN scheduler.TaskSetManager: Lost task 36.1 in stage
>> 451.0 (TID 22515, chsnmphbase19.usdc2.cloud.com): java.lang.Exception:
>> Could not compute split, block input-3-1424842355600 not found
>> at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
>>
>> --
>> Thanks & Regards,
>>
>> *Mukesh Jha *
>>
>
>
>
> --
>
>
> Thanks & Regards,
>
> *Mukesh Jha *
>



-- 


Thanks & Regards,

*Mukesh Jha *


Getting to proto buff classes in Spark Context

2015-02-26 Thread necro351 .
Hello everyone,

We are trying to decode a message inside a Spark job that we receive from
Kafka. The message is encoded using Proto Buff. The problem is when
decoding we get class-not-found exceptions. We have tried remedies we found
online in Stack Exchange and mail list archives but nothing seems to work.

(This question is a re-ask, but we really cannot figure this one out.)

We created a standalone repository with a very simple Spark job that
exhibits the above issues. The spark job reads the messages from the FS,
decodes them, and prints them. Its easy to checkout and try to see the
exception yourself: just uncomment the code that prints the messages from
within the RDD. The only sources are the generated Proto Buff java sources
and a small Spark Job that decodes a message. I'd appreciate if anyone
could take a look.

https://github.com/vibhav/spark-protobuf

We tried a couple remedies already.

Setting "spark.files.userClassPathFirst" didn't fix the problem for us. I
am not very familiar with the Spark and Scala environment, so please
correct any incorrect assumptions or statements I make.

However, I don't believe this to be a classpath visibility issue. I wrote a
small helper method to print out the classpath from both the driver and
worker, and the output is identical. (I'm printing out
System.getProperty("java.class.path") -- is there a better way to do this
or check the class path?). You can print out the class paths the same way
we are from the example project above.

Furthermore, userClassPathFirst seems to have a detrimental effect on
otherwise working code, which I cannot explain or do not understand.

For example, I created a trivial RDD as such:

val l = List(1, 2, 3)
sc.makeRDD(l).foreach((x: Int) => {
println(x.toString)
})

With userClassPathFirst set, I encounter a java.lang.ClassCastException
trying to execute that code. Is that to be expected? You can re-create this
issue by commenting out the block of code that tries to print the above in
the example project we linked to above.

We also tried dynamically adding the jar with .addJar to the Spark Context
but this seemed to have no effect.

Thanks in advance for any help y'all can provide.


Re: Apache Ignite vs Apache Spark

2015-02-26 Thread Sean Owen
Ignite is the renaming of GridGain, if that helps. It's like Oracle
Coherence, if that helps. These do share some similarities -- fault
tolerant, in-memory, distributed processing. The pieces they're built
on differ, the architecture differs, the APIs differ. So fairly
different in particulars. I never used the above, so can't be much
more useful.

On Thu, Feb 26, 2015 at 5:46 PM, Ognen Duzlevski
 wrote:
> Can someone with experience briefly share or summarize the differences
> between Ignite and Spark? Are they complementary? Totally unrelated?
> Overlapping? Seems like ignite has reached version 1.0, I have never heard
> of it until a few days ago and given what is advertised, it sounds pretty
> interesting but I am unsure how this relates to or differs from Spark.
>
> Thanks!
> Ognen

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Augment more data to existing MatrixFactorization Model?

2015-02-26 Thread anishm
I am a beginner to the world of Machine Learning and the usage of Apache
Spark. 
I have followed the tutorial at 
https://databricks-training.s3.amazonaws.com/movie-recommendation-with-mllib.html#augmenting-matrix-factors

 
, and was succesfully able to develop the application. Now, as it is
required that today's web application need to be powered by real time
recommendations. I would like my model to be ready for new data that keeps
coming on the server. 
The site has quoted:
*
A better way to get the recommendations for you is training a matrix
factorization model first and then augmenting the model using your ratings.*

How do I do that? I am using Python to develop my application. Also, please
tell me how do I persist the model to use it again, or an idea how do I
interface this with a web service.

Thanking you,
Anish Mashankar
A Data Science Enthusiast



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Augment-more-data-to-existing-MatrixFactorization-Model-tp21830.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



how to map and filter in one step?

2015-02-26 Thread Crystal Xing
Hi,
I have a text file input and I want to parse line by line and map each line
to another format. But at the same time, I want to filter out some lines I
do not need.

I wonder if there is a way to filter out those lines in the map function.

Do I have to do two steps filter and map?  In that way, I have to scan and
parse the lines twice in order to filter and map.

If I map those unwanted line to null and filter out null, will that work?
never tried yet.

Thanks,

Zheng zheng


How to augment data to existing MatrixFactorizationModel?

2015-02-26 Thread anishm
I am a beginner to the world of Machine Learning and the usage of Apache
Spark. 
I have followed the tutorial at 
https://databricks-training.s3.amazonaws.com/movie-recommendation-with-mllib.html#augmenting-matrix-factors

 
, and was succesfully able to develop the application. Now, as it is
required that today's web application need to be powered by real time
recommendations. I would like my model to be ready for new data that keeps
coming on the server. 
The site has quoted:
*
A better way to get the recommendations for you is training a matrix
factorization model first and then augmenting the model using your ratings.*

How do I do that? I am using Python to develop my application. Also, please
tell me how do I persist the model to use it again, or an idea how do I
interface this with a web service.

Thanking you,
Anish Mashankar
A Data Science Enthusiast



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-augment-data-to-existing-MatrixFactorizationModel-tp21831.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: how to map and filter in one step?

2015-02-26 Thread Sean Owen
You can flatMap:

rdd.flatMap { in =>
  if (condition(in)) {
Some(transformation(in))
  } else {
None
  }
}

On Thu, Feb 26, 2015 at 6:39 PM, Crystal Xing  wrote:
> Hi,
> I have a text file input and I want to parse line by line and map each line
> to another format. But at the same time, I want to filter out some lines I
> do not need.
>
> I wonder if there is a way to filter out those lines in the map function.
>
> Do I have to do two steps filter and map?  In that way, I have to scan and
> parse the lines twice in order to filter and map.
>
> If I map those unwanted line to null and filter out null, will that work?
> never tried yet.
>
> Thanks,
>
> Zheng zheng

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: value foreach is not a member of java.util.List[edu.stanford.nlp.util.CoreMap]

2015-02-26 Thread Sean Owen
(Books on Spark are not produced by the Spark project, and this is not
the right place to ask about them. This question was already answered
offline, too.)

On Thu, Feb 26, 2015 at 6:38 PM, Deepak Vohra
 wrote:
>   Ch 6 listing from Advanced Analytics with Spark generates error. The
> listing is
>
> def plainTextToLemmas(text: String, stopWords: Set[String], pipeline:
> StanfordCoreNLP)
> : Seq[String] = {
> val doc = new Annotation(text)
> pipeline.annotate(doc)
> val lemmas = new ArrayBuffer[String]()
> val sentences = doc.get(classOf[SentencesAnnotation])
> for (sentence <- sentences; token <-
> sentence.get(classOf[TokensAnnotation])) {
>   val lemma = token.get(classOf[LemmaAnnotation])
>   if (lemma.length > 2 && !stopWords.contains(lemma) &&
> isOnlyLetters(lemma)) {
> lemmas += lemma.toLowerCase
>   }
> }
> lemmas
>   }
>
> The error is
>
> :37: error: value foreach is not a member of
> java.util.List[edu.stanford.nlp.util.CoreMap]
>for (sentence <- sentences; token <-
> sentence.get(classOf[TokensAnnot
> ation])) {
> ^

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



value foreach is not a member of java.util.List[edu.stanford.nlp.util.CoreMap]

2015-02-26 Thread Deepak Vohra
  Ch 6 listing from Advanced Analytics with Spark generates error. The listing 
is 
def plainTextToLemmas(text: String, stopWords: Set[String],pipeline: 
StanfordCoreNLP)    : Seq[String] = {    val doc = newAnnotation(text)   
pipeline.annotate(doc)    val lemmas = newArrayBuffer[String]()    val 
sentences =doc.get(classOf[SentencesAnnotation])    for (sentence <-sentences; 
token <- sentence.get(classOf[TokensAnnotation])) {  val lemma 
=token.get(classOf[LemmaAnnotation])  if (lemma.length> 2 && 
!stopWords.contains(lemma) && isOnlyLetters(lemma)) {    lemmas 
+=lemma.toLowerCase  }    }    lemmas  }
The error is
:37: error: value foreach is not a member of 
java.util.List[edu.stanford.nlp.util.CoreMap]
   for (sentence <- sentences; token <- sentence.get(classOf[TokensAnnot
ation])) {
    ^

Re: how to map and filter in one step?

2015-02-26 Thread Crystal Xing
I see.
The reason we can use flatmap to map to null but not using map to map to
null is because
flatmap supports map to zero and more  but map only support 1-1 mapping?

It seems Flatmap is more equivalent to haddop's map.


Thanks,

Zheng zhen

On Thu, Feb 26, 2015 at 10:44 AM, Sean Owen  wrote:

> You can flatMap:
>
> rdd.flatMap { in =>
>   if (condition(in)) {
> Some(transformation(in))
>   } else {
> None
>   }
> }
>
> On Thu, Feb 26, 2015 at 6:39 PM, Crystal Xing 
> wrote:
> > Hi,
> > I have a text file input and I want to parse line by line and map each
> line
> > to another format. But at the same time, I want to filter out some lines
> I
> > do not need.
> >
> > I wonder if there is a way to filter out those lines in the map function.
> >
> > Do I have to do two steps filter and map?  In that way, I have to scan
> and
> > parse the lines twice in order to filter and map.
> >
> > If I map those unwanted line to null and filter out null, will that work?
> > never tried yet.
> >
> > Thanks,
> >
> > Zheng zheng
>


Re: how to map and filter in one step?

2015-02-26 Thread Mark Hamstra
rdd.map(foo).filter(bar) and rdd.filter(bar).map(foo) will each already be
pipelined into a single stage, so there generally isn't any need to
complect the map and filter into a single function.

Additionally, there is RDD#collect[U](f: PartialFunction[T, U])(implicit
arg0: ClassTag[U]): RDD[U], which only applies the partial function to
those elements of the RDD for which f is defined.

On Thu, Feb 26, 2015 at 10:49 AM, Crystal Xing 
wrote:

> I see.
> The reason we can use flatmap to map to null but not using map to map to
> null is because
> flatmap supports map to zero and more  but map only support 1-1 mapping?
>
> It seems Flatmap is more equivalent to haddop's map.
>
>
> Thanks,
>
> Zheng zhen
>
> On Thu, Feb 26, 2015 at 10:44 AM, Sean Owen  wrote:
>
>> You can flatMap:
>>
>> rdd.flatMap { in =>
>>   if (condition(in)) {
>> Some(transformation(in))
>>   } else {
>> None
>>   }
>> }
>>
>> On Thu, Feb 26, 2015 at 6:39 PM, Crystal Xing 
>> wrote:
>> > Hi,
>> > I have a text file input and I want to parse line by line and map each
>> line
>> > to another format. But at the same time, I want to filter out some
>> lines I
>> > do not need.
>> >
>> > I wonder if there is a way to filter out those lines in the map
>> function.
>> >
>> > Do I have to do two steps filter and map?  In that way, I have to scan
>> and
>> > parse the lines twice in order to filter and map.
>> >
>> > If I map those unwanted line to null and filter out null, will that
>> work?
>> > never tried yet.
>> >
>> > Thanks,
>> >
>> > Zheng zheng
>>
>
>


Re: throughput in the web console?

2015-02-26 Thread Tathagata Das
If you have one receiver, and you are doing only map-like operaitons then
the process will primarily happen on one machine. To use all the machines,
either receiver in parallel with multiple receivers, or spread out the
computation by explicitly repartitioning the received streams
(DStream.repartition) with sufficient partitions to load balance across
more machines.

TD

On Thu, Feb 26, 2015 at 9:52 AM, Saiph Kappa  wrote:

> One more question: while processing the exact same batch I noticed that
> giving more CPUs to the worker does not decrease the duration of the batch.
> I tried this with 4 and 8 CPUs. Though, I noticed that giving only 1 CPU
> the duration increased, but apart from that the values were pretty similar,
> whether I was using 4 or 6 or 8 CPUs.
>
> On Thu, Feb 26, 2015 at 5:35 PM, Saiph Kappa 
> wrote:
>
>> By setting spark.eventLog.enabled to true it is possible to see the
>> application UI after the application has finished its execution, however
>> the Streaming tab is no longer visible.
>>
>> For measuring the duration of batches in the code I am doing something
>> like this:
>> «wordCharValues.foreachRDD(rdd => {
>> val startTick = System.currentTimeMillis()
>> val result = rdd.take(1)
>> val timeDiff = System.currentTimeMillis() - startTick»
>>
>> But my quesiton is: is it possible to see the rate/throughput
>> (records/sec) when I have a stream to process log files that appear in a
>> folder?
>>
>>
>>
>> On Thu, Feb 26, 2015 at 1:36 AM, Tathagata Das 
>> wrote:
>>
>>> Yes. # tuples processed in a batch = sum of all the tuples received by
>>> all the receivers.
>>>
>>> In screen shot, there was a batch with 69.9K records, and there was a
>>> batch which took 1 s 473 ms. These two batches can be the same, can be
>>> different batches.
>>>
>>> TD
>>>
>>> On Wed, Feb 25, 2015 at 10:11 AM, Josh J  wrote:
>>>
 If I'm using the kafka receiver, can I assume the number of records
 processed in the batch is the sum of the number of records processed by the
 kafka receiver?

 So in the screen shot attached the max rate of tuples processed in a
 batch is 42.7K + 27.2K = 69.9K tuples processed in a batch with a max
 processing time of 1 second 473 ms?

 On Wed, Feb 25, 2015 at 8:48 AM, Akhil Das 
 wrote:

> By throughput you mean Number of events processed etc?
>
> [image: Inline image 1]
>
> Streaming tab already have these statistics.
>
>
>
> Thanks
> Best Regards
>
> On Wed, Feb 25, 2015 at 9:59 PM, Josh J  wrote:
>
>>
>> On Wed, Feb 25, 2015 at 7:54 AM, Akhil Das <
>> ak...@sigmoidanalytics.com> wrote:
>>
>>> For SparkStreaming applications, there is already a tab called
>>> "Streaming" which displays the basic statistics.
>>
>>
>> Would I just need to extend this tab to add the throughput?
>>
>
>


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org

>>>
>>>
>>
>


NullPointerException in TaskSetManager

2015-02-26 Thread gtinside
Hi ,

I am trying to run a simple hadoop job (that uses
CassandraHadoopInputOutputWriter) on spark (v1.2 , Hadoop v 1.x) but getting
NullPointerException in TaskSetManager

WARN 2015-02-26 14:21:43,217 [task-result-getter-0] TaskSetManager - Lost
task 14.2 in stage 0.0 (TID 29, devntom003.dev.blackrock.com):
java.lang.NullPointerException

org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1007)
com.bfm.spark.test.CassandraHadoopMigrator$.main(CassandraHadoopMigrator.scala:77)
com.bfm.spark.test.CassandraHadoopMigrator.main(CassandraHadoopMigrator.scala)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:606)
org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


logs doesn't have any stack trace, can someone please help ?

Regards,
Gaurav




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/NullPointerException-in-TaskSetManager-tp21832.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Missing tasks

2015-02-26 Thread Akshat Aranya
I am seeing a problem with a Spark job in standalone mode.  Spark master's
web interface shows a task RUNNING on a particular executor, but the logs
of the executor do not show the task being ever assigned to it, that is,
such a line is missing from the log:

15/02/25 16:53:36 INFO executor.CoarseGrainedExecutorBackend: Got assigned
task 27.

The missing task wasn't the first task executing on the executor.  Also,
subsequent tasks are running on other executor threads on this executor.

Any good way to figure out what happened here?

-Akshat


Re: value foreach is not a member of java.util.List[edu.stanford.nlp.util.CoreMap]

2015-02-26 Thread Deepak Vohra
Sean,

Would you kindly suggest on which forum, mailing list or issues to ask question 
about the AAS book?
Or no  such provision is made?

regards,
Deepak

On Thu, 2/26/15, Sean Owen  wrote:

 Subject: Re: value foreach is not a member of 
java.util.List[edu.stanford.nlp.util.CoreMap]
 To: "Deepak Vohra" 
 Cc: "user@spark.apache.org" 
 Date: Thursday, February 26, 2015, 10:43 AM
 
 (Books on Spark are not
 produced by the Spark project, and this is not
 the right place to ask about them. This
 question was already answered
 offline,
 too.)
 
 On Thu, Feb 26, 2015 at
 6:38 PM, Deepak Vohra
 
 wrote:
 >   Ch 6 listing from
 Advanced Analytics with Spark generates error. The
 > listing is
 >
 > def plainTextToLemmas(text: String,
 stopWords: Set[String], pipeline:
 >
 StanfordCoreNLP)
 >     :
 Seq[String] = {
 >     val doc
 = new Annotation(text)
 > 
    pipeline.annotate(doc)
 > 
    val lemmas = new ArrayBuffer[String]()
 >     val sentences =
 doc.get(classOf[SentencesAnnotation])
 > 
    for (sentence <- sentences; token <-
 > sentence.get(classOf[TokensAnnotation]))
 {
 >       val lemma =
 token.get(classOf[LemmaAnnotation])
 > 
      if (lemma.length > 2 &&
 !stopWords.contains(lemma) &&
 >
 isOnlyLetters(lemma)) {
 >     
    lemmas += lemma.toLowerCase
 >       }
 >     }
 > 
    lemmas
 >   }
 >
 > The error is
 >
 > :37:
 error: value foreach is not a member of
 >
 java.util.List[edu.stanford.nlp.util.CoreMap]
 >            for (sentence <-
 sentences; token <-
 >
 sentence.get(classOf[TokensAnnot
 >
 ation])) {
 >               
              ^
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Getting to proto buff classes in Spark Context

2015-02-26 Thread Akshat Aranya
My guess would be that you are packaging too many things in your job, which
is causing problems with the classpath.  When your jar goes in first, you
get the correct version of protobuf, but some other version of something
else.  When your jar goes in later, other things work, but protobuf
breaks.  This is just a guess though; take a look at what you're packaging
in your jar and look for things that Spark or Kafka could also be using.

On Thu, Feb 26, 2015 at 10:06 AM, necro351 .  wrote:

> Hello everyone,
>
> We are trying to decode a message inside a Spark job that we receive from
> Kafka. The message is encoded using Proto Buff. The problem is when
> decoding we get class-not-found exceptions. We have tried remedies we found
> online in Stack Exchange and mail list archives but nothing seems to work.
>
> (This question is a re-ask, but we really cannot figure this one out.)
>
> We created a standalone repository with a very simple Spark job that
> exhibits the above issues. The spark job reads the messages from the FS,
> decodes them, and prints them. Its easy to checkout and try to see the
> exception yourself: just uncomment the code that prints the messages from
> within the RDD. The only sources are the generated Proto Buff java sources
> and a small Spark Job that decodes a message. I'd appreciate if anyone
> could take a look.
>
> https://github.com/vibhav/spark-protobuf
>
> We tried a couple remedies already.
>
> Setting "spark.files.userClassPathFirst" didn't fix the problem for us. I
> am not very familiar with the Spark and Scala environment, so please
> correct any incorrect assumptions or statements I make.
>
> However, I don't believe this to be a classpath visibility issue. I wrote
> a small helper method to print out the classpath from both the driver and
> worker, and the output is identical. (I'm printing out
> System.getProperty("java.class.path") -- is there a better way to do this
> or check the class path?). You can print out the class paths the same way
> we are from the example project above.
>
> Furthermore, userClassPathFirst seems to have a detrimental effect on
> otherwise working code, which I cannot explain or do not understand.
>
> For example, I created a trivial RDD as such:
>
> val l = List(1, 2, 3)
> sc.makeRDD(l).foreach((x: Int) => {
> println(x.toString)
> })
>
> With userClassPathFirst set, I encounter a java.lang.ClassCastException
> trying to execute that code. Is that to be expected? You can re-create this
> issue by commenting out the block of code that tries to print the above in
> the example project we linked to above.
>
> We also tried dynamically adding the jar with .addJar to the Spark Context
> but this seemed to have no effect.
>
> Thanks in advance for any help y'all can provide.
>


Re: NegativeArraySizeException when doing joins on skewed data

2015-02-26 Thread Imran Rashid
Hi Tristan,

at first I thought you were just hitting another instance of
https://issues.apache.org/jira/browse/SPARK-1391, but I actually think its
entirely related to kryo.  Would it be possible for you to try serializing
your object using kryo, without involving spark at all?  If you are
unfamiliar w/ kryo, you could just try something like this, it would also
be OK to try out the utils in spark to do it, something like:

val outputStream = new
FileOutputStream("/some/local/path/doesn't/really/matter/just/delete/me/afterwards")

val kryoSer = new org.apache.spark.serializer.KryoSerializer(sparkConf)
val kryoStreamSer = kryoSer.newInstance().serializeStream(outputStream)

kryoStreamSer.writeObject(yourBigObject).close()

My guess is that this will fail.  There is a little of spark's wrapping
code involved here too, but I suspect the error is out of our control.
>From the error, it seems like whatever object you are trying to serialize
has more than 2B references:
Caused by: java.lang.NegativeArraySizeException
at
com.esotericsoftware.kryo.util.IdentityObjectIntMap.
resize(IdentityObjectIntMap.java:409)

Though that is rather surprising -- it doesn't even seem possible to me
with an object that is only 6 GB.

There are a handful of other size restrictions and tuning parameters that
come with kryo as well.  It would be good for us to write up some docs on
those limitations, as well as work with the kryo devs to see which ones can
be removed.  (Eg., another one that I just noticed from browsing the code
is that even when writing to a stream, kryo has an internal buffer of
limited size, which is periodically flushes.  Perhaps we can get kryo to
turn off that buffer, or we can at least get it to flush more often.)

thanks,
Imran


On Thu, Feb 26, 2015 at 1:06 AM, Tristan Blakers 
wrote:

> I get the same exception simply by doing a large broadcast of about 6GB.
> Note that I’m broadcasting a small number (~3m) of fat objects. There’s
> plenty of free RAM. This and related kryo exceptions seem to crop-up
> whenever an object graph of more than a couple of GB gets passed around.
>
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
>
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>
> at
> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>
> at
> com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:86)
>
> at
> com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:17)
>
> at
> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>
> at
> org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:128)
>
> at
> org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:202)
>
> at
> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:101)
>
> at
> org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:84)
>
> at
> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
>
> at
> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29)
>
> at
> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
>
> at org.apache.spark.SparkContext.broadcast(SparkContext.scala:945)
>
> at
> org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:623)
>
>
> Caused by: java.lang.NegativeArraySizeException
>
> at
> com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:409)
>
> at
> com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:227)
>
> at
> com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221)
>
> at
> com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:117)
>
> at
> com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:228)
>
> at
> com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221)
>
> at
> com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:117)
>
>

Re: GroupByKey causing problem

2015-02-26 Thread Imran Rashid
Hi Tushar,

The most scalable option is probably for you to consider doing some
approximation.  Eg., sample the first to come up with the bucket
boundaries.  Then you can assign data points to buckets without needing to
do a full groupByKey.  You could even have more passes which corrects any
errors in your approximation (eg., see how sortByKey() works, and how it
samples the underlying RDD when constructing the RangePartitioner).  Though
its more passes through the data, it will probably be much faster since you
avoid the expensive groupByKey()

Imran

On Thu, Feb 26, 2015 at 3:38 AM, Tushar Sharma  wrote:

> Hi,
>
> I am trying to apply binning to a large CSV dataset. Here are the steps I
> am taking:
>
> 1. Emit each value of CSV as (ColIndex,(RowIndex,value))
> 2. Then I groupByKey (here ColumnIndex) and get all values of a particular
> index to one node, as I have to work on the collection of all values
> 3. I apply my binning algorithm which is as follows:
> a. Sort the values
> b. Iterate through values and see if it is different than the previous
> one
> if no then add it to the same bin
> if yes then check the size of that bin, if it is greater than a
> particular size (say 5% of wholedataset) then change the bin
> number, else keep the same bin
> c. repeat for each column
>
> Due to this algorithm I can't calculate it partition wise and merge for
> final result. But even for groupByKey I expect it should work , maybe
> slowly, but it should finish. I increased the partition to reduce the
> output of each groupByKey so that it helps in successful completion of the
> process. But even with that it is stuck at the same stage. The log for
> executor says:
>
> ExternalMapAppendOnly(splilling to disk) (Trying ...)
>
> The code works for small CSV files but can't complete for big files.
>
> val inputfile = "hdfs://hm41:9000/user/file1"
> val table = sc.textFile(inputfile,1000)
>
> val withoutHeader: RDD[String] = dropHeader(table)
>
> val kvPairs = withoutHeader.flatMap(retAtrTuple)
>
> //val filter_na = kvPairs.map{case (x,y) => (x,if(y == "NA") "" else y)}
>
> val isNum = kvPairs.map{case (x,y) => (x,isNumeric(y))}.reduceByKey(_&&_)
>
> val numeric_indexes = isNum.filter{case (x,y) => y}.sortByKey().map{case
> (x,y) => x}.collect()
> //val isNum_Arr = isNum.sortByKey().collect()
>
> val kvidx = withoutHeader.zipWithIndex
> //val t = kvidx.map{case (a,b) => retAtrTuple(a).map(x =>(x,b)) }
>
>
> val t = kvidx.flatMap{case (a,b) => retAtrTuple(a).map(x =>(x,b)) }
> val t2 = t.filter{case (a,b) => numeric_indexes contains a._1 }
>
> //val t2 = t.filter{case (a,b) => a._1 ==0 }
> val t3 = t2.map{case ((a,b),c) => (a,(c,b.toDouble))}
> //val t4 = t3.sortBy(_._2._1)
> val t4 = t3.groupByKey.map{case (a,b) =>
> (a,classing_summary(b.toArray.sortBy(_._2)))}
>
> def dropHeader(data: RDD[String]): RDD[String] = {
> data.mapPartitionsWithIndex((idx, lines) => {
>   if (idx == 0) {
> lines.drop(1)
>   }
>   lines
> })
>   }
>
>
>   def retAtrTuple(x: String) = {
> val newX = x.split(',')
> for (h <- 0 until newX.length)
>   yield (h, newX(h))
>   }
>
> def isNumeric(s: String): Boolean = {
> (allCatch opt s.toDouble).isDefined
>   }
>
> def classing_summary(arr: Array[(Long, Double)]) = {
>   var idx = 0L
>   var value = 0.0
>   var prevValue = Double.MinValue
>   var counter = 1
>   var classSize = 0.0
>   var size = arr.length
>
>   val output = for(i <- 0 until arr.length) yield {
>   idx = arr(i)._1;
>   value = arr(i)._2;
>   if(value==prevValue){
> classSize+=1.0/size;
> //println("both values same")
> //println(idx,value,classSize,counter,classSize);
> prevValue = value;
> (idx,value,counter,classSize);
>   }
>   else if(classSize<(0.05)){
> classSize+=1.0/size;
> //println("both values not same, adding to present bucket")
> //println(idx,value,classSize,counter,classSize);
> prevValue = value;
> (idx,value,counter,classSize);
>   }
>   else {
> classSize = 1.0/size;
> counter +=1;
> //println("both values not same, adding to different bucket")
> //println(idx,value,classSize,counter,classSize);
> prevValue = value;
> (idx,value,counter,classSize);
>   }
>   }
>   output.toArray
> }
>
> Thanks in advance,
>
> Tushar Sharma
>


[SparkSQL, Spark 1.2] UDFs in group by broken?

2015-02-26 Thread Yana Kadiyska
Can someone confirm if they can run UDFs in group by in spark1.2?

I have two builds running -- one from a custom build from early December
(commit 4259ca8dd12) which works fine, and Spark1.2-RC2.

On the latter I get:

 jdbc:hive2://XXX.208:10001> select
from_unixtime(epoch,'-MM-dd-HH'),count(*) count
. . . . . . . . . . . . . . . . . .> from tbl
. . . . . . . . . . . . . . . . . .> group by
from_unixtime(epoch,'-MM-dd-HH');
Error: org.apache.spark.sql.catalyst.errors.package$TreeNodeException:
Expression not in GROUP BY:
HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFFromUnixTime(epoch#1049L,-MM-dd-HH)
AS _c0#1004, tree:
Aggregate 
[HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFFromUnixTime(epoch#1049L,-MM-dd-HH)],
[HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFFromUnixTime(epoch#1049L,-MM-dd-HH)
AS _c0#1004,COUNT(1) AS count#1003L]
 MetastoreRelation default, tbl, None (state=,code=0)

​

This worked fine on my older build. I don't see a JIRA on this but maybe
I'm not looking right. Can someone please advise?


Re: Cartesian issue with user defined objects

2015-02-26 Thread Imran Rashid
any chance your input RDD is being read from hdfs, and you are running into
this issue (in the docs on SparkContext#hadoopFile):

* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable
object for each
* record, directly caching the returned RDD or directly passing it to an
aggregation or shuffle
* operation will create many references to the same object.
* If you plan to directly cache, sort, or aggregate Hadoop writable
objects, you should first
* copy them using a `map` function.



On Thu, Feb 26, 2015 at 10:38 AM, mrk91  wrote:

> Hello,
>
> I have an issue with the cartesian method. When I use it with the Java
> types everything is ok, but when I use it with RDD made of objects defined
> by me it has very strage behaviors which depends on whether the RDD is
> cached or not (you can see here
> 
> what happens).
>
> Is this due to a bug in its implementation or are there any requirements
> for the objects to be passed to it?
> Thanks.
> Best regards.
> Marco
> --
> View this message in context: Cartesian issue with user defined objects
> 
> Sent from the Apache Spark User List mailing list archive
>  at Nabble.com.
>


Integrating Spark Streaming with Reactive Mongo

2015-02-26 Thread Mike Trienis
Hi All,

I have Spark Streaming setup to write data to a replicated MongoDB database
and would like to understand if there would be any issues using the
Reactive Mongo library to write directly to the mongoDB? My stack is Apache
Spark sitting on top of Cassandra for the datastore, so my thinking is that
the MongoDB connector for Hadoop will not be particular useful for me since
I'm not using HDFS? Is there anything that I'm missing?

Here is an example of code that I'm planning on using as a starting point
for my implementation.

LogAggregator


Thanks, Mike.


How to tell if one RDD depends on another

2015-02-26 Thread Corey Nolet
Let's say I'm given 2 RDDs and told to store them in a sequence file and
they have the following dependency:

val rdd1 = sparkContext.sequenceFile().cache()
val rdd2 = rdd1.map()


How would I tell programmatically without being the one who built rdd1 and
rdd2 whether or not rdd2 depends on rdd1?

I'm working on a concurrency model for my application and I won't
necessarily know how the two rdds are constructed. What I will know is
whether or not rdd1 is cached but i want to maximum concurrency and run
rdd1 and rdd2 together if rdd2 does not depend on rdd1.


Re: Iterating on RDDs

2015-02-26 Thread Imran Rashid
val grouped = R.groupBy[VertexId](G).persist(StorageLeve.MEMORY_ONLY_SER)
// or whatever persistence makes more sense for you ...
while(true) {
  val res = grouped.flatMap(F)
  res.collect.foreach(func)
  if(criteria)
 break
}

On Thu, Feb 26, 2015 at 10:56 AM, Vijayasarathy Kannan 
wrote:

> Hi,
>
> I have the following use case.
>
> (1) I have an RDD of edges of a graph (say R).
> (2) do a groupBy on R (by say source vertex) and call a function F on each
> group.
> (3) collect the results from Fs and do some computation
> (4) repeat the above steps until some criteria is met
>
> In (2), the groups are always going to be the same (since R is grouped by
> source vertex).
>
> Question:
> Is R distributed every iteration (when in (2)) or is it distributed only
> once when it is created?
>
> A sample code snippet is below.
>
> while(true) {
>   val res = R.groupBy[VertexId](G).flatMap(F)
>   res.collect.foreach(func)
>   if(criteria)
>  break
> }
>
> Since the groups remain the same, what is the best way to go about
> implementing the above logic?
>


Re: How to tell if one RDD depends on another

2015-02-26 Thread Corey Nolet
I see the "rdd.dependencies()" function, does that include ALL the
dependencies of an RDD? Is it safe to assume I can say
"rdd2.dependencies.contains(rdd1)"?

On Thu, Feb 26, 2015 at 4:28 PM, Corey Nolet  wrote:

> Let's say I'm given 2 RDDs and told to store them in a sequence file and
> they have the following dependency:
>
> val rdd1 = sparkContext.sequenceFile().cache()
> val rdd2 = rdd1.map()
>
>
> How would I tell programmatically without being the one who built rdd1 and
> rdd2 whether or not rdd2 depends on rdd1?
>
> I'm working on a concurrency model for my application and I won't
> necessarily know how the two rdds are constructed. What I will know is
> whether or not rdd1 is cached but i want to maximum concurrency and run
> rdd1 and rdd2 together if rdd2 does not depend on rdd1.
>
>


Re: Scheduler hang?

2015-02-26 Thread Victor Tso-Guillen
Okay I confirmed my suspicions of a hang. I made a request that stopped
progressing, though the already-scheduled tasks had finished. I made a
separate request that was small enough not to hang, and it kicked the hung
job enough to finish. I think what's happening is that the scheduler or the
local backend is not kicking the revive offers messaging at the right time,
but I have to dig into the code some more to nail the culprit. Anyone on
these list have experience in those code areas that could help?

On Thu, Feb 26, 2015 at 2:27 AM, Victor Tso-Guillen  wrote:

> Thanks for the link. Unfortunately, I turned on rdd compression and
> nothing changed. I tried moving netty -> nio and no change :(
>
> On Thu, Feb 26, 2015 at 2:01 AM, Akhil Das 
> wrote:
>
>> Not many that i know of, but i bumped into this one
>> https://issues.apache.org/jira/browse/SPARK-4516
>>
>> Thanks
>> Best Regards
>>
>> On Thu, Feb 26, 2015 at 3:26 PM, Victor Tso-Guillen 
>> wrote:
>>
>>> Is there any potential problem from 1.1.1 to 1.2.1 with shuffle
>>> dependencies that produce no data?
>>>
>>> On Thu, Feb 26, 2015 at 1:56 AM, Victor Tso-Guillen 
>>> wrote:
>>>
 The data is small. The job is composed of many small stages.

 * I found that with fewer than 222 the problem exhibits. What will be
 gained by going higher?
 * Pushing up the parallelism only pushes up the boundary at which the
 system appears to hang. I'm worried about some sort of message loss or
 inconsistency.
 * Yes, we are using Kryo.
 * I'll try that, but I'm again a little confused why you're
 recommending this. I'm stumped so might as well?

 On Wed, Feb 25, 2015 at 11:13 PM, Akhil Das >>> > wrote:

> What operation are you trying to do and how big is the data that you
> are operating on?
>
> Here's a few things which you can try:
>
> - Repartition the RDD to a higher number than 222
> - Specify the master as local[*] or local[10]
> - Use Kryo Serializer (.set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer"))
> - Enable RDD Compression (.set("spark.rdd.compress","true") )
>
>
> Thanks
> Best Regards
>
> On Thu, Feb 26, 2015 at 10:15 AM, Victor Tso-Guillen 
> wrote:
>
>> I'm getting this really reliably on Spark 1.2.1. Basically I'm in
>> local mode with parallelism at 8. I have 222 tasks and I never seem to 
>> get
>> far past 40. Usually in the 20s to 30s it will just hang. The last 
>> logging
>> is below, and a screenshot of the UI.
>>
>> 2015-02-25 20:39:55.779 GMT-0800 INFO  [task-result-getter-3]
>> TaskSetManager - Finished task 3.0 in stage 16.0 (TID 22) in 612 ms on
>> localhost (1/5)
>> 2015-02-25 20:39:55.825 GMT-0800 INFO  [Executor task launch
>> worker-10] Executor - Finished task 1.0 in stage 16.0 (TID 20). 2492 
>> bytes
>> result sent to driver
>> 2015-02-25 20:39:55.825 GMT-0800 INFO  [Executor task launch
>> worker-8] Executor - Finished task 2.0 in stage 16.0 (TID 21). 2492 bytes
>> result sent to driver
>> 2015-02-25 20:39:55.831 GMT-0800 INFO  [task-result-getter-0]
>> TaskSetManager - Finished task 1.0 in stage 16.0 (TID 20) in 670 ms on
>> localhost (2/5)
>> 2015-02-25 20:39:55.836 GMT-0800 INFO  [task-result-getter-1]
>> TaskSetManager - Finished task 2.0 in stage 16.0 (TID 21) in 674 ms on
>> localhost (3/5)
>> 2015-02-25 20:39:55.891 GMT-0800 INFO  [Executor task launch
>> worker-9] Executor - Finished task 0.0 in stage 16.0 (TID 19). 2492 bytes
>> result sent to driver
>> 2015-02-25 20:39:55.896 GMT-0800 INFO  [task-result-getter-2]
>> TaskSetManager - Finished task 0.0 in stage 16.0 (TID 19) in 740 ms on
>> localhost (4/5)
>>
>> [image: Inline image 1]
>> What should I make of this? Where do I start?
>>
>> Thanks,
>> Victor
>>
>
>

>>>
>>
>


Re: Help me understand the partition, parallelism in Spark

2015-02-26 Thread Imran Rashid
Hi Yong,

mostly correct except for:

>
>- Since we are doing reduceByKey, shuffling will happen. Data will be
>shuffled into 1000 partitions, as we have 1000 unique keys.
>
> no, you will not get 1000 partitions.  Spark has to decide how many
partitions to use before it even knows how many unique keys there are.  If
you have 200 as the default parallelism (or you just explicitly make it the
second parameter to reduceByKey()), then you will get 200 partitions.  The
1000 unique keys will be distributed across the 200 partitions.  ideally
they will be distributed pretty equally, but how they get distributed
depends on the partitioner (by default you will have a HashPartitioner, so
it depends on the hash of your keys).

Note that this is more or less the same as in Hadoop MapReduce.

the amount of parallelism matters b/c there are various places in spark
where there is some overhead proportional to the size of a partition.  So
in your example, if you have 1000 unique keys in 200 partitions, you expect
about 5 unique keys per partitions -- if instead you had 10 partitions,
you'd expect 100 unique keys per partitions, and thus more data and you'd
be more likely to hit an OOM.  But there are many other possible sources of
OOM, so this is definitely not the *only* solution.

Sorry I can't comment in particular about Spark SQL -- hopefully somebody
more knowledgeable can comment on that.



On Wed, Feb 25, 2015 at 8:58 PM, java8964  wrote:

> Hi, Sparkers:
>
> I come from the Hadoop MapReducer world, and try to understand some
> internal information of spark. From the web and this list, I keep seeing
> people talking about increase the parallelism if you get the OOM error. I
> tried to read document as much as possible to understand the RDD partition,
> and parallelism usage in the spark.
>
> I understand that for RDD from HDFS, by default, one partition will be one
> HDFS block, pretty straightforward. I saw that lots of RDD operations
> support 2nd parameter of parallelism. This is the part confuse me. From my
> understand, the parallelism is totally controlled by how many cores you
> give to your job. Adjust that parameter, or "spark.default.parallelism"
> shouldn't have any impact.
>
> For example, if I have a 10G data in HDFS, and assume the block size is
> 128M, so we get 100 blocks/partitions in RDD. Now if I transfer that RDD to
> a Pair RDD, with 1000 unique keys in the pair RDD, and doing reduceByKey
> action, using 200 as the default parallelism. Here is what I assume:
>
>
>- We have 100 partitions, as the data comes from 100 blocks. Most
>likely the spark will generate 100 tasks to read and shuffle them?
>- The 1000 unique keys mean the 1000 reducer group, like in MR
>- If I set the max core to be 50, so there will be up to 50 tasks can
>be run concurrently. The rest tasks just have to wait for the core, if
>there are 50 tasks are running.
>- Since we are doing reduceByKey, shuffling will happen. Data will be
>shuffled into 1000 partitions, as we have 1000 unique keys.
>- I don't know these 1000 partitions will be processed by how many
>tasks, maybe this is the parallelism parameter comes in?
>- No matter what parallelism this will be, there are ONLY 50 task can
>be run concurrently. So if we set more cores, more partitions' data will be
>processed in the executor (which runs more thread in this case), so more
>memory needs. I don't see how increasing parallelism could help the OOM in
>this case.
>- In my test case of Spark SQL, I gave 24G as the executor heap, my
>join between 2 big datasets keeps getting OOM. I keep increasing the
>"spark.default.parallelism", from 200 to 400, to 2000, even to 4000, no
>help. What really makes the query finish finally without OOM is after I
>change the "--total-executor-cores" from 10 to 4.
>
>
> So my questions are:
> 1) What is the parallelism really mean in the Spark? In the simple example
> above, for reduceByKey, what difference it is between parallelism change
> from 10 to 20?
> 2) When we talk about partition in the spark, for the data coming from
> HDFS, I can understand the partition clearly. For the intermediate data,
> the partition will be same as key, right? For group, reducing, join action,
> uniqueness of the keys will be partition. Is that correct?
> 3) Why increasing parallelism could help OOM? I don't get this part. From
> my limited experience, adjusting the core count really matters for memory.
>
> Thanks
>
> Yong
>


Running spark function on parquet without sql

2015-02-26 Thread tridib
Hello Experts,
In one of my projects we are having parquet files and we are using spark SQL
to get our analytics. I am encountering situation where simple SQL is not
getting me what I need or the complex SQL is not supported by Spark Sql. In
scenarios like this I am able to get things done using low level spark
constructs like MapFunction and reducers.

My question is if I create a JavaSchemaRdd on Parquet and use basic spark
constructs, will I still get the benefit of parquets columnar format? Will
my aggregation be as fast as it would have been if I have used SQL?

Please advice.

Thanks & Regards
Tridib



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Running-spark-function-on-parquet-without-sql-tp21833.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to tell if one RDD depends on another

2015-02-26 Thread Imran Rashid
no, it does not give you transitive dependencies.  You'd have to walk the
tree of dependencies yourself, but that should just be a few lines.

On Thu, Feb 26, 2015 at 3:32 PM, Corey Nolet  wrote:

> I see the "rdd.dependencies()" function, does that include ALL the
> dependencies of an RDD? Is it safe to assume I can say
> "rdd2.dependencies.contains(rdd1)"?
>
> On Thu, Feb 26, 2015 at 4:28 PM, Corey Nolet  wrote:
>
>> Let's say I'm given 2 RDDs and told to store them in a sequence file and
>> they have the following dependency:
>>
>> val rdd1 = sparkContext.sequenceFile().cache()
>> val rdd2 = rdd1.map()
>>
>>
>> How would I tell programmatically without being the one who built rdd1
>> and rdd2 whether or not rdd2 depends on rdd1?
>>
>> I'm working on a concurrency model for my application and I won't
>> necessarily know how the two rdds are constructed. What I will know is
>> whether or not rdd1 is cached but i want to maximum concurrency and run
>> rdd1 and rdd2 together if rdd2 does not depend on rdd1.
>>
>>
>


Re: Help me understand the partition, parallelism in Spark

2015-02-26 Thread Yana Kadiyska
Imran, I have also observed the phenomenon of reducing the cores helping
with OOM. I wanted to ask this (hopefully without straying off topic): we
can specify the number of cores and the executor memory. But we don't get
to specify _how_ the cores are spread among executors.

Is it possible that with 24G memory and 4 cores we get a spread of 1 core
per executor thus ending up with 24G for the task, but with 24G memory and
10 cores some executor ends up with 3 cores on the same machine and thus we
have only 8G per task?

On Thu, Feb 26, 2015 at 4:42 PM, Imran Rashid  wrote:

> Hi Yong,
>
> mostly correct except for:
>
>>
>>- Since we are doing reduceByKey, shuffling will happen. Data will be
>>shuffled into 1000 partitions, as we have 1000 unique keys.
>>
>> no, you will not get 1000 partitions.  Spark has to decide how many
> partitions to use before it even knows how many unique keys there are.  If
> you have 200 as the default parallelism (or you just explicitly make it the
> second parameter to reduceByKey()), then you will get 200 partitions.  The
> 1000 unique keys will be distributed across the 200 partitions.  ideally
> they will be distributed pretty equally, but how they get distributed
> depends on the partitioner (by default you will have a HashPartitioner, so
> it depends on the hash of your keys).
>
> Note that this is more or less the same as in Hadoop MapReduce.
>
> the amount of parallelism matters b/c there are various places in spark
> where there is some overhead proportional to the size of a partition.  So
> in your example, if you have 1000 unique keys in 200 partitions, you expect
> about 5 unique keys per partitions -- if instead you had 10 partitions,
> you'd expect 100 unique keys per partitions, and thus more data and you'd
> be more likely to hit an OOM.  But there are many other possible sources of
> OOM, so this is definitely not the *only* solution.
>
> Sorry I can't comment in particular about Spark SQL -- hopefully somebody
> more knowledgeable can comment on that.
>
>
>
> On Wed, Feb 25, 2015 at 8:58 PM, java8964  wrote:
>
>> Hi, Sparkers:
>>
>> I come from the Hadoop MapReducer world, and try to understand some
>> internal information of spark. From the web and this list, I keep seeing
>> people talking about increase the parallelism if you get the OOM error. I
>> tried to read document as much as possible to understand the RDD partition,
>> and parallelism usage in the spark.
>>
>> I understand that for RDD from HDFS, by default, one partition will be
>> one HDFS block, pretty straightforward. I saw that lots of RDD operations
>> support 2nd parameter of parallelism. This is the part confuse me. From my
>> understand, the parallelism is totally controlled by how many cores you
>> give to your job. Adjust that parameter, or "spark.default.parallelism"
>> shouldn't have any impact.
>>
>> For example, if I have a 10G data in HDFS, and assume the block size is
>> 128M, so we get 100 blocks/partitions in RDD. Now if I transfer that RDD to
>> a Pair RDD, with 1000 unique keys in the pair RDD, and doing reduceByKey
>> action, using 200 as the default parallelism. Here is what I assume:
>>
>>
>>- We have 100 partitions, as the data comes from 100 blocks. Most
>>likely the spark will generate 100 tasks to read and shuffle them?
>>- The 1000 unique keys mean the 1000 reducer group, like in MR
>>- If I set the max core to be 50, so there will be up to 50 tasks can
>>be run concurrently. The rest tasks just have to wait for the core, if
>>there are 50 tasks are running.
>>- Since we are doing reduceByKey, shuffling will happen. Data will be
>>shuffled into 1000 partitions, as we have 1000 unique keys.
>>- I don't know these 1000 partitions will be processed by how many
>>tasks, maybe this is the parallelism parameter comes in?
>>- No matter what parallelism this will be, there are ONLY 50 task can
>>be run concurrently. So if we set more cores, more partitions' data will 
>> be
>>processed in the executor (which runs more thread in this case), so more
>>memory needs. I don't see how increasing parallelism could help the OOM in
>>this case.
>>- In my test case of Spark SQL, I gave 24G as the executor heap, my
>>join between 2 big datasets keeps getting OOM. I keep increasing the
>>"spark.default.parallelism", from 200 to 400, to 2000, even to 4000, no
>>help. What really makes the query finish finally without OOM is after I
>>change the "--total-executor-cores" from 10 to 4.
>>
>>
>> So my questions are:
>> 1) What is the parallelism really mean in the Spark? In the simple
>> example above, for reduceByKey, what difference it is between parallelism
>> change from 10 to 20?
>> 2) When we talk about partition in the spark, for the data coming from
>> HDFS, I can understand the partition clearly. For the intermediate data,
>> the partition will be same as key, right? For group, reducing, join action,

Re: Integrating Spark Streaming with Reactive Mongo

2015-02-26 Thread Tathagata Das
Hey Mike,

I quickly looked through the example and I found major performance issue.
You are collecting the RDDs to the driver and then sending them to Mongo in
a foreach. Why not doing a distributed push to Mongo?

WHAT YOU HAVE
val mongoConnection = ...

WHAT YOU SHUOLD DO

rdd.foreachPartition { iterator =>
   val connection = createConnection()
   iterator.foreach { ... push partition using connection ...  }
}


On Thu, Feb 26, 2015 at 1:25 PM, Mike Trienis 
wrote:

> Hi All,
>
> I have Spark Streaming setup to write data to a replicated MongoDB
> database and would like to understand if there would be any issues using
> the Reactive Mongo library to write directly to the mongoDB? My stack is
> Apache Spark sitting on top of Cassandra for the datastore, so my thinking
> is that the MongoDB connector for Hadoop will not be particular useful for
> me since I'm not using HDFS? Is there anything that I'm missing?
>
> Here is an example of code that I'm planning on using as a starting point
> for my implementation.
>
> LogAggregator
> 
>
> Thanks, Mike.
>


Re: [SparkSQL, Spark 1.2] UDFs in group by broken?

2015-02-26 Thread Yin Huai
Seems you hit https://issues.apache.org/jira/browse/SPARK-4296. It has been
fixed in 1.2.1 and 1.3.

On Thu, Feb 26, 2015 at 1:22 PM, Yana Kadiyska 
wrote:

> Can someone confirm if they can run UDFs in group by in spark1.2?
>
> I have two builds running -- one from a custom build from early December
> (commit 4259ca8dd12) which works fine, and Spark1.2-RC2.
>
> On the latter I get:
>
>  jdbc:hive2://XXX.208:10001> select 
> from_unixtime(epoch,'-MM-dd-HH'),count(*) count
> . . . . . . . . . . . . . . . . . .> from tbl
> . . . . . . . . . . . . . . . . . .> group by 
> from_unixtime(epoch,'-MM-dd-HH');
> Error: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: 
> Expression not in GROUP BY: 
> HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFFromUnixTime(epoch#1049L,-MM-dd-HH)
>  AS _c0#1004, tree:
> Aggregate 
> [HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFFromUnixTime(epoch#1049L,-MM-dd-HH)],
>  
> [HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFFromUnixTime(epoch#1049L,-MM-dd-HH)
>  AS _c0#1004,COUNT(1) AS count#1003L]
>  MetastoreRelation default, tbl, None (state=,code=0)
>
> ​
>
> This worked fine on my older build. I don't see a JIRA on this but maybe
> I'm not looking right. Can someone please advise?
>
>
>
>


  1   2   >