Re: many-to-many join

2015-07-22 Thread Sonal Goyal
If I understand this correctly, you could join area_code_user and
area_code_state and then flat map to get
user, areacode, state. Then groupby/reduce by user.

You can also try some join optimizations like partitioning on area code or
broadcasting smaller table depending on size of area_code_state.
On Jul 22, 2015 10:15 AM, "John Berryman"  wrote:

> Quick example problem that's stumping me:
>
> * Users have 1 or more phone numbers and therefore one or more area codes.
> * There are 100M users.
> * States have one or more area codes.
> * I would like to the states for the users (as indicated by phone area
> code).
>
> I was thinking about something like this:
>
> If area_code_user looks like (area_code,[user_id]) ex: (615,[1234567])
> and area_code_state looks like (area_code,state) ex: (615, ["Tennessee"])
> then we could do
>
> states_and_users_mixed = area_code_user.join(area_code_state) \
> .reduceByKey(lambda a,b: a+b) \
> .values()
>
> user_state_pairs = states_and_users_mixed.flatMap(
> emit_cartesian_prod_of_userids_and_states )
> user_to_states =   user_state_pairs.reduceByKey(lambda a,b: a+b)
>
> user_to_states.first(1)
>
> >>> (1234567,["Tennessee","Tennessee","California"])
>
> This would work, but the user_state_pairs is just a list of user_ids and
> state names mixed together and emit_cartesian_prod_of_userids_and_states
> has to correctly pair them. This is problematic because 1) it's weird and
> sloppy and 2) there will be lots of users per state and having so many
> users in a single row is going to make
> emit_cartesian_prod_of_userids_and_states work extra hard to first locate
> states and then emit all userid-state pairs.
>
> How should I be doing this?
>
> Thanks,
> -John
>


Re: Spark spark.shuffle.memoryFraction has no affect

2015-07-22 Thread Andrew Or
Hi,

The setting of 0.2 / 0.6 looks reasonable to me. Since you are not using
caching at all, have you tried trying something more extreme, like 0.1 /
0.9? Since disabling spark.shuffle.spill didn't cause an OOM this setting
should be fine. Also, one thing you could do is to verify the shuffle bytes
spilled on the UI before and after the change.

Let me know if that helped.
-Andrew

2015-07-21 13:50 GMT-07:00 wdbaruni :

> Hi
> I am testing Spark on Amazon EMR using Python and the basic wordcount
> example shipped with Spark.
>
> After running the application, I realized that in Stage 0 reduceByKey(add),
> around 2.5GB shuffle is spilled to memory and 4GB shuffle is spilled to
> disk. Since in the wordcount example I am not caching or persisting any
> data, so I thought I can increase the performance of this application by
> giving more shuffle memoryFraction. So, in spark-defaults.conf, I added the
> following:
>
> spark.storage.memoryFraction0.2
> spark.shuffle.memoryFraction0.6
>
> However, I am still getting the same performance and the same amount of
> shuffle data is being spilled to disk and memory. I validated that Spark is
> reading these configurations using Spark UI/Environment and I can see my
> changes. Moreover, I tried setting spark.shuffle.spill to false and I got
> the performance I am looking for and all shuffle data was spilled to memory
> only.
>
> So, what am I getting wrong here and why not the extra shuffle memory
> fraction is not utilized?
>
> *My environment:*
> Amazon EMR with Spark 1.3.1 running using -x argument
> 1 Master node: m3.xlarge
> 3 Core nodes: m3.xlarge
> Application: wordcount.py
> Input: 10 .gz files 90MB each (~350MB unarchived) stored in S3
>
> *Submit command:*
> /home/hadoop/spark/bin/spark-submit --deploy-mode client /mnt/wordcount.py
> s3n://
>
> *spark-defaults.conf:*
> spark.eventLog.enabled  false
> spark.executor.extraJavaOptions -verbose:gc -XX:+PrintGCDetails
> -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC
> -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70
> spark.driver.extraJavaOptions   -Dspark.driver.log.level=INFO
> spark.masteryarn
> spark.executor.instances3
> spark.executor.cores4
> spark.executor.memory   9404M
> spark.default.parallelism   12
> spark.eventLog.enabled  true
> spark.eventLog.dir  hdfs:///spark-logs/
> spark.storage.memoryFraction0.2
> spark.shuffle.memoryFraction0.6
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-spark-shuffle-memoryFraction-has-no-affect-tp23944.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: How to share a Map among RDDS?

2015-07-22 Thread Andrew Or
Hi Dan,

If the map is small enough, you can just broadcast it, can't you? It
doesn't have to be an RDD. Here's an example of broadcasting an array and
using it on the executors:
https://github.com/apache/spark/blob/c03299a18b4e076cabb4b7833a1e7632c5c0dabe/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala
.

-Andrew

2015-07-21 19:56 GMT-07:00 ayan guha :

> Either you have to do rdd.collect and then broadcast or you can do a join
> On 22 Jul 2015 07:54, "Dan Dong"  wrote:
>
>> Hi, All,
>>
>>
>> I am trying to access a Map from RDDs that are on different compute
>> nodes, but without success. The Map is like:
>>
>> val map1 = Map("aa"->1,"bb"->2,"cc"->3,...)
>>
>> All RDDs will have to check against it to see if the key is in the Map or
>> not, so seems I have to make the Map itself global, the problem is that if
>> the Map is stored as RDDs and spread across the different nodes, each node
>> will only see a piece of the Map and the info will not be complete to check
>> against the Map( an then replace the key with the corresponding value) E,g:
>>
>> val matchs= Vecs.map(term=>term.map{case (a,b)=>(map1(a),b)})
>>
>> But if the Map is not an RDD, how to share it like sc.broadcast(map1)
>>
>> Any idea about this? Thanks!
>>
>>
>> Cheers,
>> Dan
>>
>>


Re: Does Spark streaming support is there with RabbitMQ

2015-07-22 Thread Abel Rincón
Hi,

We tested this receiver internally in stratio sparkta, and it works fine,
If you will try the receiver, we're open to your collaboration, your issues
will be wellcome.

Regards

A.Rincón
Stratio software architect



2015-07-22 8:15 GMT+02:00 Tathagata Das :

> You could contact the authors of the spark-packages.. maybe that will help?
>
> On Mon, Jul 20, 2015 at 6:41 AM, Jeetendra Gangele 
> wrote:
>
>> Thanks Todd,
>>
>> I m not sure whether somebody has used it or not. can somebody confirm if
>> this integrate nicely with Spark streaming?
>>
>>
>>
>> On 20 July 2015 at 18:43, Todd Nist  wrote:
>>
>>> There is one package available on the spark-packages site,
>>>
>>> http://spark-packages.org/package/Stratio/RabbitMQ-Receiver
>>>
>>> The source is here:
>>>
>>> https://github.com/Stratio/RabbitMQ-Receiver
>>>
>>> Not sure that meets your needs or not.
>>>
>>> -Todd
>>>
>>> On Mon, Jul 20, 2015 at 8:52 AM, Jeetendra Gangele >> > wrote:
>>>
 Does Apache spark support RabbitMQ. I have messages on RabbitMQ and I
 want to process them using Apache Spark streaming does it scale?

 Regards
 Jeetendra

>>>
>>>
>>
>>
>


Re: Which memory fraction is Spark using to compute RDDs that are not going to be persisted

2015-07-22 Thread Andrew Or
Hi,

It would be whatever's left in the JVM. This is not explicitly controlled
by a fraction like storage or shuffle. However, the computation usually
doesn't need to use that much space. In my experience it's almost always
the caching or the aggregation during shuffles that's the most memory
intensive.

-Andrew

2015-07-21 13:47 GMT-07:00 wdbaruni :

> I am new to Spark and I understand that Spark divides the executor memory
> into the following fractions:
>
> *RDD Storage:* Which Spark uses to store persisted RDDs using .persist() or
> .cache() and can be defined by setting spark.storage.memoryFraction
> (default
> 0.6)
>
> *Shuffle and aggregation buffers:* Which Spark uses to store shuffle
> outputs. It can defined using spark.shuffle.memoryFraction. If shuffle
> output exceeds this fraction, then Spark will spill data to disk (default
> 0.2)
>
> *User code:* Spark uses this fraction to execute arbitrary user code
> (default 0.2)
>
> I am not mentioning the storage and shuffle safety fractions for
> simplicity.
>
> My question is, which memory fraction is Spark using to compute and
> transform RDDs that are not going to be persisted? For example:
>
> lines = sc.textFile("i am a big file.txt")
> count = lines.flatMap(lambda x: x.split(' ')).map(lambda x: (x,
> 1)).reduceByKey(add)
> count.saveAsTextFile("output")
>
> Here Spark will not load the whole file at once and will partition the
> input
> file and do all these transformations per partition in a single stage.
> However, which memory fraction Spark will use to load the partitioned
> lines,
> compute flatMap() and map()?
>
> Thanks
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Which-memory-fraction-is-Spark-using-to-compute-RDDs-that-are-not-going-to-be-persisted-tp23942.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: How to restart Twitter spark stream

2015-07-22 Thread Akhil Das
That was a pseudo code, working version would look like this:

val stream = TwitterUtils.createStream(ssc, None)

val hashTags = stream.flatMap(status => status.getText.split("
").filter(_.startsWith("#"))).map(x => (x.toLowerCase,1))

val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _,
Seconds(10))
  .map{case (topic, count) => (count, topic)}
  .transform(_.sortByKey(false)).map(x => x._2)

topCounts10.print()


val filteredStream = topCounts10.transform(rdd =>{
  *val samplehashtags =
ssc.sparkContext.parallelize(Array("#RobinWilliams".toLowerCase,"#android".toLowerCase,"#iphone".toLowerCase))*
  val newRDD = samplehashtags.map { x => (x,1) }
  val joined = newRDD.join(rdd)

  joined
})

filteredStream.print()

Thanks
Best Regards

On Wed, Jul 22, 2015 at 3:58 AM, Zoran Jeremic 
wrote:

> Hi Akhil and Jorn,
>
> I tried as you suggested to create some simple scenario, but I have an
> error on rdd.join(newRDD):  "value join is not a member of
> org.apache.spark.rdd.RDD[twitter4j.Status]". The code looks like:
>
> val stream = TwitterUtils.createStream(ssc, auth)
>> val filteredStream= stream.transform(rdd =>{
>> val samplehashtags=Array("music","film")
>> val newRDD= samplehashtags.map { x => (x,1) }
>> rdd.join(newRDD)
>>  })
>>
>
> Did I miss something here?
>
> Thanks,
> Zoran
>
> On Mon, Jul 20, 2015 at 9:54 AM, Zoran Jeremic 
> wrote:
>
>> Thanks for explanation.
>>
>> If I understand this correctly, in this approach I would actually stream
>> everything from Twitter, and perform filtering in my application using
>> Spark. Isn't this too much overhead if my application is interested in
>> listening for couple of hundreds or thousands hashtags?
>> On one side, this will be better approach since I will not have the
>> problem to open new streams if number of hashtags go over 400 which is the
>> Twitter limit for User stream filtering, but on the other side I'm concern
>> about how much it will affect application performance if I stream
>> everything that is posted on Twitter and filter it locally. It would be
>> great if somebody with experience on this could comment on these concerns.
>>
>> Thanks,
>> Zoran
>>
>> On Mon, Jul 20, 2015 at 12:19 AM, Akhil Das 
>> wrote:
>>
>>> Jorn meant something like this:
>>>
>>> val filteredStream = twitterStream.transform(rdd =>{
>>>
>>> val newRDD =
>>> scc.sc.textFile("/this/file/will/be/updated/frequently").map(x => (x,1))
>>>
>>> rdd.join(newRDD)
>>>
>>> })
>>>
>>> ​newRDD will work like a filter when you do the join.​
>>>
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Sun, Jul 19, 2015 at 9:32 PM, Zoran Jeremic 
>>> wrote:
>>>
 Hi Jorn,

 I didn't know that it is possible to change filter without re-opening
 twitter stream. Actually, I already had that question earlier at the
 stackoverflow
 
 and I got the answer that it's not possible, but it would be even better if
 there is some other way to add new hashtags or to remove old hashtags that
 user stopped following. I guess the second request would be more difficult.

 However, it would be great if you can give me some short example how to
 make this. I didn't understand well from your explanation what you mean by
 "join it with a rdd loading the newest hash tags from disk in a regular
 interval".

 Thanks,
 Zoran

 On Sun, Jul 19, 2015 at 5:01 AM, Jörn Franke 
 wrote:

> Why do you even want to stop it? You can join it with a rdd loading
> the newest hash tags from disk in a regular interval
>
> Le dim. 19 juil. 2015 à 7:40, Zoran Jeremic 
> a écrit :
>
>> Hi,
>>
>> I have a twitter spark stream initialized in the following way:
>>
>>   val ssc:StreamingContext =
>>> SparkLauncher.getSparkScalaStreamingContext()
>>>   val config = getTwitterConfigurationBuilder.build()
>>>   val auth: Option[twitter4j.auth.Authorization] =
>>> Some(new
>>>
>>> twitter4j.auth.OAuthAuthorization(config))
>>>   val stream = TwitterUtils.createStream(ssc, auth,
>>> filters)
>>>
>>
>> This works fine when I initialy start it. However, at some point I
>> need to update filters since users might add new hashtags they want to
>> follow. I tried to stop the running stream and spark streaming context
>> without stoping spark context, e.g:
>>
>>
>>>stream.stop()
>>>ssc.stop(false)
>>>
>>
>> Afterward, I'm trying to initialize a new Twitter stream like I did
>> previously. However, I got this exception:
>>
>> Exception in thread "Firestorm JMX Monitor"
>>> java.lang.IllegalStateException: Adding new inputs, transformations, and
>>> output operations after stopping a conte

Use directories for partition pruning Spark SQL

2015-07-22 Thread Johan Lundahl
Hi,

I have data files (json in this example but could also be avro) written in
a directory structure like:

dataroot
+-- year=2015
+-- month=06
+-- day=01
+-- data1.json
+-- data2.json
+-- data3.json
+-- day=02
+-- data1.json
+-- data2.json
+-- data3.json
+-- month=07
+-- day=20
+-- data1.json
+-- data2.json
+-- data3.json
+-- day=21
+-- data1.json
+-- data2.json
+-- data3.json
+-- day=22
+-- data1.json
+-- data2.json

Using spark-sql I create a temporary table:

CREATE TEMPORARY TABLE dataTable
USING org.apache.spark.sql.json
OPTIONS (
  path "dataroot/*"
)

Querying the table works well but I'm so far not able to use the
directories for pruning.
Is there a way to register the directory structure as partitions (without
using Hive) to avoid scanning the whole tree when I query, say I want to
compare data for the first day of the month?

Thanks,
Johan


Mesos + Spark

2015-07-22 Thread boci
Hi guys!

I'm a new in mesos. I have two spark application (one streaming and one
batch). I want to run both app in mesos cluster. Now for testing I want to
run in docker container so I started a simple redjack/mesos-master, but I
think a lot of think unclear for me (both mesos and spark-mesos).

If I have a mesos cluster (for testing it will be some docker container) i
need a separate machine (container) to run my spark job? Or can I submit
the cluster and schedule (chronos or I dunno)?
How can I run the streaming job? What happened if the "controller" died? Or
if I call spark-submit with master=mesos my application started and I can
forget? How can I run in every 10 min without submit in every 10 min? How
can I run my streaming app in HA mode?

Thanks

b0c1

--
Skype: boci13, Hangout: boci.b...@gmail.com


Re: Spark-hive parquet schema evolution

2015-07-22 Thread Cheng Lian
Since Hive doesn’t support schema evolution, you’ll have to update the 
schema stored in metastore somehow. For example, you can create a new 
external table with the merged schema. Say you have a Hive table |t1|:


|CREATE TABLE t1 (c0 INT, c1 DOUBLE); |

By default, this table is stored in HDFS path 
|hdfs://some-host:9000/user/hive/warehouse/t1|. Now you append some 
Parquet data with an extra column |c2| to the same directory:


|import org.apache.spark.sql.types._ val path = 
"hdfs://some-host:9000/user/hive/warehouse/t1" val df1 = 
sqlContext.range(10).select('id as 'c0, 'id cast DoubleType as 'c1, 'id 
cast StringType as 'c2) df1.write.mode("append").parquet(path) |


Now you can create a new external table |t2| like this:

|val df2 = sqlContext.read.option("mergeSchema", "true").parquet(path) 
df2.write.path(path).saveAsTable("t2") |


Since we specified a path above, the newly created |t2| is an external 
table pointing to the original HDFS location. But the schema of |t2| is 
the merged version.


The drawback of this approach is that, |t2| is actually a Spark SQL 
specific data source table rather than a genuine Hive table. This means, 
it can be accessed by Spark SQL only. We’re just using Hive metastore to 
help persisting metadata of the data source table. However, since you’re 
asking how to access the new table via Spark SQL CLI, this should work 
for you. We are working on making Parquet and ORC data source tables 
accessible via Hive in Spark 1.5.0.


Cheng

On 7/22/15 10:32 AM, Jerrick Hoang wrote:


Hi Lian,

Sorry I'm new to Spark so I did not express myself very clearly. I'm 
concerned about the situation when let's say I have a Parquet table 
some partitions and I add a new column A to parquet schema and write 
some data with the new schema to a new partition in the table. If i'm 
not mistaken, if I do a 
sqlContext.read.parquet(table_path).printSchema() it will print the 
correct schema with new column A. But if I do a 'describe table' from 
SparkSQLCLI I won't see the new column being added. I understand that 
this is because Hive doesn't support schema evolution. So what is the 
best way to support CLI queries in this situation? Do I need to 
manually alter the table everytime the underlying schema changes?


Thanks

On Tue, Jul 21, 2015 at 4:37 PM, Cheng Lian > wrote:


Hey Jerrick,

What do you mean by "schema evolution with Hive metastore tables"?
Hive doesn't take schema evolution into account. Could you please
give a concrete use case? Are you trying to write Parquet data
with extra columns into an existing metastore Parquet table?

Cheng


On 7/21/15 1:04 AM, Jerrick Hoang wrote:

I'm new to Spark, any ideas would be much appreciated! Thanks

On Sat, Jul 18, 2015 at 11:11 AM, Jerrick Hoang
mailto:jerrickho...@gmail.com>> wrote:

Hi all,

I'm aware of the support for schema evolution via DataFrame
API. Just wondering what would be the best way to go about
dealing with schema evolution with Hive metastore tables. So,
say I create a table via SparkSQL CLI, how would I deal with
Parquet schema evolution?

Thanks,
J






​


Re: many-to-many join

2015-07-22 Thread ayan guha
Hi

RDD solution:
>>> u = [(615,1),(720,1),(615,2)]
>>> urdd=sc.parallelize(u,1)
>>> a1 = [(615,'T'),(720,'C')]
>>> ardd=sc.parallelize(a1,1)
>>> def addString(s1,s2):
... return s1+','+s2
>>> j = urdd.join(ardd).map(lambda t:t[1]).reduceByKey(addString)
>>> print j.collect()
[(2, 'T'), (1, 'C,T')]

However, if you can assume   is far far greater than
, you may think to broadcast variable in a
dict format and look up in the map. Like this

>>> u = [(1,615),(1,720),(2,615)]
>>> a = {615:'T',720:'C'}
>>> urdd=sc.parallelize(u)
>>> def usr_area_state(tup):
... uid=tup[0]
... aid=tup[1]
... sid=bc.value[aid]
... return uid,(sid,)
...
>>> bc=sc.broadcast(a)
>>> usrdd=urdd.map(usr_area_state)
>>> def addTuple(t1,t2):
... return t1+t2
...
>>> out=usrdd.reduceByKey(addTuple)
>>> print out.collect()
[(1, ('T', 'C')), (2, ('T',))]

Best
Ayan

On Wed, Jul 22, 2015 at 5:14 PM, Sonal Goyal  wrote:

> If I understand this correctly, you could join area_code_user and
> area_code_state and then flat map to get
> user, areacode, state. Then groupby/reduce by user.
>
> You can also try some join optimizations like partitioning on area code or
> broadcasting smaller table depending on size of area_code_state.
> On Jul 22, 2015 10:15 AM, "John Berryman"  wrote:
>
>> Quick example problem that's stumping me:
>>
>> * Users have 1 or more phone numbers and therefore one or more area codes.
>> * There are 100M users.
>> * States have one or more area codes.
>> * I would like to the states for the users (as indicated by phone area
>> code).
>>
>> I was thinking about something like this:
>>
>> If area_code_user looks like (area_code,[user_id]) ex: (615,[1234567])
>> and area_code_state looks like (area_code,state) ex: (615, ["Tennessee"])
>> then we could do
>>
>> states_and_users_mixed = area_code_user.join(area_code_state) \
>> .reduceByKey(lambda a,b: a+b) \
>> .values()
>>
>> user_state_pairs = states_and_users_mixed.flatMap(
>> emit_cartesian_prod_of_userids_and_states )
>> user_to_states =   user_state_pairs.reduceByKey(lambda a,b: a+b)
>>
>> user_to_states.first(1)
>>
>> >>> (1234567,["Tennessee","Tennessee","California"])
>>
>> This would work, but the user_state_pairs is just a list of user_ids and
>> state names mixed together and emit_cartesian_prod_of_userids_and_states
>> has to correctly pair them. This is problematic because 1) it's weird and
>> sloppy and 2) there will be lots of users per state and having so many
>> users in a single row is going to make
>> emit_cartesian_prod_of_userids_and_states work extra hard to first locate
>> states and then emit all userid-state pairs.
>>
>> How should I be doing this?
>>
>> Thanks,
>> -John
>>
>


-- 
Best Regards,
Ayan Guha


Is spark suitable for real time query

2015-07-22 Thread Louis Hust
Hi, all

I am using spark jar in standalone mode, fetch data from different mysql
instance and do some action, but i found the time is at second level.

So i want to know if spark job is suitable for real time query which at
microseconds?


Re: Is spark suitable for real time query

2015-07-22 Thread Robin East
Real-time is, of course, relative but you’ve mentioned microsecond level. Spark 
is designed to process large amounts of data in a distributed fashion. No 
distributed system I know of could give any kind of guarantees at the 
microsecond level.

Robin

> On 22 Jul 2015, at 11:14, Louis Hust  wrote:
> 
> Hi, all
> 
> I am using spark jar in standalone mode, fetch data from different mysql 
> instance and do some action, but i found the time is at second level.
> 
> So i want to know if spark job is suitable for real time query which at 
> microseconds?


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Scaling spark cluster for a running application

2015-07-22 Thread phagunbaya
I have a spark cluster running in client mode with driver outside the spark
cluster. I want to scale the cluster after an application is submitted. In
order to do this, I'm creating new workers and they are getting registered
with master but issue I'm seeing is; running application does not use the
newly added worker. Hence cannot add more resources to existing running
application.

Is there any other way or config to deal with this use-case ? How to make
running application to ask for executors from newly issued worker node ?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Scaling-spark-cluster-for-a-running-application-tp23951.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Proper saving/loading of MatrixFactorizationModel

2015-07-22 Thread PShestov
Hi all!
I have MatrixFactorizationModel object. If I'm trying to recommend products
to single user right after constructing model through ALS.train(...) then it
takes 300ms (for my data and hardware). But if I save model to disk and load
it back then recommendation takes almost 2000ms. Also Spark warns:
15/07/17 11:05:47 WARN MatrixFactorizationModel: User factor does not have a
partitioner. Prediction on individual records could be slow.
15/07/17 11:05:47 WARN MatrixFactorizationModel: User factor is not cached.
Prediction could be slow.
15/07/17 11:05:47 WARN MatrixFactorizationModel: Product factor does not
have a partitioner. Prediction on individual records could be slow.
15/07/17 11:05:47 WARN MatrixFactorizationModel: Product factor is not
cached. Prediction could be slow.
How can I create/set partitioner and cache user and product factors after
loading model? Following approach didn't help:
model.userFeatures().cache();
model.productFeatures().cache();
Also I was trying to repartition those rdds and create new model from
repartitioned versions but that also didn't help.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Proper-saving-loading-of-MatrixFactorizationModel-tp23952.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Broadcast variables in R

2015-07-22 Thread FRANCHOIS Serge
Thank you very much Shivaram. I’ve got it working on Mac now by specifying the 
namespace.
Using SparkR:::parallelize() iso just parallelize()

Wkr,
Serge



On 21 Jul 2015, at 17:20, Shivaram Venkataraman 
mailto:shiva...@eecs.berkeley.edu>> wrote:

There shouldn't be anything Mac OS specific about this feature. One point of 
warning though -- As mentioned previously in this thread the APIs were made 
private because we aren't sure we will be supporting them in the future. If you 
are using these APIs it would be good to chime in on the JIRA with your use-case

Thanks
Shivaram

On Tue, Jul 21, 2015 at 2:34 AM, Serge Franchois 
mailto:serge.franch...@altran.com>> wrote:
I might add to this that I've done the same exercise on Linux (CentOS 6) and
there, broadcast variables ARE working. Is this functionality perhaps not
exposed on Mac OS X?  Or has it to do with the fact there are no native
Hadoop libs for Mac?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Broadcast-variables-in-R-tp23914p23927.html
Sent from the Apache Spark User List mailing list archive at 
Nabble.com.

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.org





Re: Is spark suitable for real time query

2015-07-22 Thread Louis Hust
I do a simple test using spark in standalone mode(not cluster),
 and found a simple action take a few seconds, the data size is small, just
few rows.
So each spark job will cost some time for init or prepare work no matter
what the job is?
I mean if the basic framework of spark job will cost seconds?

2015-07-22 19:17 GMT+08:00 Robin East :

> Real-time is, of course, relative but you’ve mentioned microsecond level.
> Spark is designed to process large amounts of data in a distributed
> fashion. No distributed system I know of could give any kind of guarantees
> at the microsecond level.
>
> Robin
>
> > On 22 Jul 2015, at 11:14, Louis Hust  wrote:
> >
> > Hi, all
> >
> > I am using spark jar in standalone mode, fetch data from different mysql
> instance and do some action, but i found the time is at second level.
> >
> > So i want to know if spark job is suitable for real time query which at
> microseconds?
>
>


R: Is spark suitable for real time query

2015-07-22 Thread Paolo Platter
Are you using jdbc server?

Paolo

Inviata dal mio Windows Phone

Da: Louis Hust
Inviato: ‎22/‎07/‎2015 13:47
A: Robin East
Cc: user@spark.apache.org
Oggetto: Re: Is spark suitable for real time query

I do a simple test using spark in standalone mode(not cluster),
 and found a simple action take a few seconds, the data size is small, just few 
rows.
So each spark job will cost some time for init or prepare work no matter what 
the job is?
I mean if the basic framework of spark job will cost seconds?

2015-07-22 19:17 GMT+08:00 Robin East 
mailto:robin.e...@xense.co.uk>>:
Real-time is, of course, relative but you’ve mentioned microsecond level. Spark 
is designed to process large amounts of data in a distributed fashion. No 
distributed system I know of could give any kind of guarantees at the 
microsecond level.

Robin

> On 22 Jul 2015, at 11:14, Louis Hust 
> mailto:louis.h...@gmail.com>> wrote:
>
> Hi, all
>
> I am using spark jar in standalone mode, fetch data from different mysql 
> instance and do some action, but i found the time is at second level.
>
> So i want to know if spark job is suitable for real time query which at 
> microseconds?




Re: Is spark suitable for real time query

2015-07-22 Thread Igor Berman
you can use spark rest job server(or any other solution that provides long
running spark context) so that you won't pay this bootstrap time on each
query
in addition : if you have some rdd that u want your queries to be executed
on, you can cache this rdd in memory(depends on ur cluster memory size) so
that you wont pay reading from disk time


On 22 July 2015 at 14:46, Louis Hust  wrote:

> I do a simple test using spark in standalone mode(not cluster),
>  and found a simple action take a few seconds, the data size is small,
> just few rows.
> So each spark job will cost some time for init or prepare work no matter
> what the job is?
> I mean if the basic framework of spark job will cost seconds?
>
> 2015-07-22 19:17 GMT+08:00 Robin East :
>
>> Real-time is, of course, relative but you’ve mentioned microsecond level.
>> Spark is designed to process large amounts of data in a distributed
>> fashion. No distributed system I know of could give any kind of guarantees
>> at the microsecond level.
>>
>> Robin
>>
>> > On 22 Jul 2015, at 11:14, Louis Hust  wrote:
>> >
>> > Hi, all
>> >
>> > I am using spark jar in standalone mode, fetch data from different
>> mysql instance and do some action, but i found the time is at second level.
>> >
>> > So i want to know if spark job is suitable for real time query which at
>> microseconds?
>>
>>
>


Applications metrics unseparatable from Master metrics?

2015-07-22 Thread Romi Kuntsman
Hi,

I tried to enable Master metrics source (to get number of running/waiting
applications etc), and connected it to Graphite.
However, when these are enabled, application metrics are also sent.

Is it possible to separate them, and send only master metrics without
applications?

I see that Master class is registering both:
https://github.com/apache/spark/blob/b9a922e260bec1b211437f020be37fab46a85db0/core/src/main/scala/org/apache/spark/deploy/master/Master.scala#L91

Thanks,
RK.


Need help in SparkSQL

2015-07-22 Thread Jeetendra Gangele
HI All,

I have data in MongoDb(few TBs) which I want to migrate to HDFS to do
complex queries analysis on this data.Queries like AND queries involved
multiple fields

So my question in which which format I should store the data in HDFS so
that processing will be fast for such kind of queries?


Regards
Jeetendra


Re: How to build Spark with my own version of Hadoop?

2015-07-22 Thread jay vyas
As you know, the hadoop versions and so on are available in the spark build
files, iirc the top level pox.xml has all the maven variables for versions.

So I think if you just build hadoop locally (i.e. build it as it to
2.2.1234-SNAPSHOT and mvn install it), you should be able to change the
corresponding varaible in the top level spark pom.xml.

.

Of course this is a pandoras box where now you need to also deploy your
custom YARN on your cluster, make sure it matches the spark target, and so
on (if your running spark on YARN).  RPMs and DEB packages tend to be
useful for this kind of thing, since you can easily sync the /etc/ config
files and uniformly manage/upgrade versions etc.  ...  Thus... if your
really serious about building a custom distribution, mixing & matching
hadoop components separately, you might want to consider using Apache
BigTop, just bring this up on that mailing list... We curate a hadoop
distribution "builder" that builds spark, hadoop, hive, ignite, kafka,
zookeeper, hbase and so on...  Since bigtop has all the tooling necessary
to fully build, test, and deploy on VMs/containers your hadoop bits, it
might make your life a little easier.



On Tue, Jul 21, 2015 at 11:11 PM, Dogtail Ray  wrote:

> Hi,
>
> I have modified some Hadoop code, and want to build Spark with the
> modified version of Hadoop. Do I need to change the compilation dependency
> files? How to then? Great thanks!
>



-- 
jay vyas


java.lang.IllegalArgumentException: problem reading type: type = group, name = param, original type = null

2015-07-22 Thread SkyFox
Hello!

I don't understand why, but I can't read data from my parquet file. I made
parquet file from json file and read it to data frame:

/df.printSchema()

|-- param: struct (nullable = true)
 ||-- FORM: string (nullable = true)
 ||-- URL: string (nullable = true)/

/When I try to read any record I get an error:

df.select("param").first()

15/07/22 13:06:15 ERROR Executor: Exception in task 0.0 in stage 8.0 (TID 4)
java.lang.IllegalArgumentException: problem reading type: type = group, name
= param, original type = null
at
parquet.schema.MessageTypeParser.addGroupType(MessageTypeParser.java:132)
at
parquet.schema.MessageTypeParser.addType(MessageTypeParser.java:106)
at
parquet.schema.MessageTypeParser.addGroupTypeFields(MessageTypeParser.java:96)
at parquet.schema.MessageTypeParser.parse(MessageTypeParser.java:89)
at
parquet.schema.MessageTypeParser.parseMessageType(MessageTypeParser.java:79)
at
parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:189)
at
parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:138)
at
org.apache.spark.sql.sources.SqlNewHadoopRDD$$anon$1.(SqlNewHadoopRDD.scala:153)
at
org.apache.spark.sql.sources.SqlNewHadoopRDD.compute(SqlNewHadoopRDD.scala:124)
at
org.apache.spark.sql.sources.SqlNewHadoopRDD.compute(SqlNewHadoopRDD.scala:66)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: expected one of [REQUIRED,
OPTIONAL, REPEATED] got utm_medium at line 29: optional binary
amp;utm_medium
at
parquet.schema.MessageTypeParser.asRepetition(MessageTypeParser.java:203)
at
parquet.schema.MessageTypeParser.addType(MessageTypeParser.java:101)
at
parquet.schema.MessageTypeParser.addGroupTypeFields(MessageTypeParser.java:96)
at
parquet.schema.MessageTypeParser.addGroupType(MessageTypeParser.java:130)
... 24 more
Caused by: java.lang.IllegalArgumentException: No enum constant
parquet.schema.Type.Repetition.UTM_MEDIUM
at java.lang.Enum.valueOf(Enum.java:238)
at parquet.schema.Type$Repetition.valueOf(Type.java:70)
at
parquet.schema.MessageTypeParser.asRepetition(MessageTypeParser.java:201)
... 27 more
15/07/22 13:06:15 WARN TaskSetManager: Lost task 0.0 in stage 8.0 (TID 4,
localhost): java.lang.IllegalArgumentException: problem reading type: type =
group, name = param, original type = null
at
parquet.schema.MessageTypeParser.addGroupType(MessageTypeParser.java:132)
at
parquet.schema.MessageTypeParser.addType(MessageTypeParser.java:106)
at
parquet.schema.MessageTypeParser.addGroupTypeFields(MessageTypeParser.java:96)
at parquet.schema.MessageTypeParser.parse(MessageTypeParser.java:89)
at
parquet.schema.MessageTypeParser.parseMessageType(MessageTypeParser.java:79)
at
parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:189)
at
parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:138)
at
org.apache.spark.sql.sources.SqlNewHadoopRDD$$anon$1.(SqlNewHadoopRDD.scala:153)
at
org.apache.spark.sql.sources.SqlNewHadoopRDD.compute(SqlNewHadoopRDD.scala:124)
at
org.apache.spark.sql.sources.SqlNewHadoopRDD.compute(SqlNewHadoopRDD.scala:66)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD

Re: use S3-Compatible Storage with spark

2015-07-22 Thread Schmirr Wurst
I could get a little further :
- installed spark-1.4.1-without-hadoop
- unpacked hadoop 2.7.1
- added the folowing in spark-env.sh

HADOOP_HOME=/opt/hadoop-2.7.1/
SPARK_DIST_CLASSPATH=/opt/hadoop-2.7.1/opt/hadoop-2.7.1/share/hadoop/tools/lib/*/share/hadoop/tools/lib/*:/opt/hadoop-2.7.1/etc/hadoop:/opt/hadoop-2.7.1/share/hadoop/common/lib/*:/opt/had$

and start spark-shell with :
bin/spark-shell --jars
/opt/hadoop-2.7.1/share/hadoop/tools/lib/hadoop-aws-2.7.1.jar

Now spark-shell is starting with
"spark.SparkContext: Added JAR
file:/opt/hadoop-2.7.1/share/hadoop/tools/lib/hadoop-aws-2.7.1.jar at
http://185.19.29.91:46368/jars/hadoop-aws-2.7.1.jar with timestamp
1437575186830"

But when trying to access s3 I have
java.util.ServiceConfigurationError: org.apache.hadoop.fs.FileSystem:
Provider org.apache.hadoop.fs.s3a.S3AFileSystem could not be
instantiated

In Fact it doesn't even matters if I try to use s3n or s3a, error is
the same (strange!)

2015-07-22 12:19 GMT+02:00 Thomas Demoor :
> You need to get the hadoop-aws.jar from hadoop-tools (use hadoop 2.7+) - you 
> can get the source and build with mvn or get it from prebuilt hadoop 
> distro's. Then when you run your spark job add --jars path/to/thejar
>
> 
> From: Schmirr Wurst 
> Sent: Wednesday, July 22, 2015 12:06 PM
> To: Thomas Demoor
> Subject: Re: use S3-Compatible Storage with spark
>
> Hi Thomas, thanks, could you just tell me what exaclty I need to do ?
> I'm not familiar with java programming
> - where do I get the jar from, do  I need to compile it with mvn ?
> - where should I update the classpath and how ?
>
>
>
> 2015-07-22 11:55 GMT+02:00 Thomas Demoor :
>> The classes are not found. Is the jar on your classpath?
>>
>> Take care: there are multiple s3 connectors in hadoop: the legacy s3n, based 
>> on a 3d party S3 lib Jets3t, and the recent (functional since hadoop 2.7)  
>> s3a based on the Amazon SDK. Make sure you stick to one: so use fs.s3a 
>> endpoint and url s3a://bucket/object or fs.s3n.endpoint and 
>> s3n://bucket/object. I recommend s3a but I'm biased :P
>>
>> Regards,
>> Thomas
>>
>> 
>> From: Schmirr Wurst 
>> Sent: Tuesday, July 21, 2015 11:59 AM
>> To: Akhil Das
>> Cc: user@spark.apache.org
>> Subject: Re: use S3-Compatible Storage with spark
>>
>> Which version do you have ?
>>
>> - I tried with spark 1.4.1 for hdp 2.6, but here I had an issue that
>> the aws-module is not there somehow:
>> java.io.IOException: No FileSystem for scheme: s3n
>> the same for s3a :
>> java.lang.RuntimeException: java.lang.ClassNotFoundException: Class
>> org.apache.hadoop.fs.s3a.S3AFileSystem not found
>>
>> - On Spark 1.4.1 for hdp 2.4 , the module is there, and works out of
>> the box for S3n (but for the endpoint)
>> But I have "java.io.IOException: No FileSystem for scheme: s3a"
>>
>> :-|
>>
>> 2015-07-21 11:09 GMT+02:00 Akhil Das :
>>> Did you try with s3a? It seems its more like an issue with hadoop.
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Tue, Jul 21, 2015 at 2:31 PM, Schmirr Wurst 
>>> wrote:

 It seems to work for the credentials , but the endpoint is ignored.. :
 I've changed it to
 sc.hadoopConfiguration.set("fs.s3n.endpoint","test.com")

 And I continue to get my data from amazon, how could it be ? (I also
 use s3n in my text url)

 2015-07-21 9:30 GMT+02:00 Akhil Das :
 > You can add the jar in the classpath, and you can set the property like:
 >
 > sc.hadoopConfiguration.set("fs.s3a.endpoint","storage.sigmoid.com")
 >
 >
 >
 > Thanks
 > Best Regards
 >
 > On Mon, Jul 20, 2015 at 9:41 PM, Schmirr Wurst 
 > wrote:
 >>
 >> Thanks, that is what I was looking for...
 >>
 >> Any Idea where I have to store and reference the corresponding
 >> hadoop-aws-2.6.0.jar ?:
 >>
 >> java.io.IOException: No FileSystem for scheme: s3n
 >>
 >> 2015-07-20 8:33 GMT+02:00 Akhil Das :
 >> > Not in the uri, but in the hadoop configuration you can specify it.
 >> >
 >> > 
 >> >   fs.s3a.endpoint
 >> >   AWS S3 endpoint to connect to. An up-to-date list is
 >> > provided in the AWS Documentation: regions and endpoints. Without
 >> > this
 >> > property, the standard region (s3.amazonaws.com) is assumed.
 >> >   
 >> > 
 >> >
 >> >
 >> > Thanks
 >> > Best Regards
 >> >
 >> > On Sun, Jul 19, 2015 at 9:13 PM, Schmirr Wurst
 >> > 
 >> > wrote:
 >> >>
 >> >> I want to use pithos, were do I can specify that endpoint, is it
 >> >> possible in the url ?
 >> >>
 >> >> 2015-07-19 17:22 GMT+02:00 Akhil Das :
 >> >> > Could you name the Storage service that you are using? Most of
 >> >> > them
 >> >> > provides
 >> >> > a S3 like RestAPI endpoint for you to hit.
 >> >> >
 >> >> > Thanks
 >> >> > Best Regards
 >> >> >
 >> >> > On Fri, Jul 

Re: Scaling spark cluster for a running application

2015-07-22 Thread Romi Kuntsman
Are you running the Spark cluster in standalone or YARN?
In standalone, the application gets the available resources when it starts.
With YARN, you can try to turn on the setting
*spark.dynamicAllocation.enabled*
See https://spark.apache.org/docs/latest/configuration.html

On Wed, Jul 22, 2015 at 2:20 PM phagunbaya  wrote:

> I have a spark cluster running in client mode with driver outside the spark
> cluster. I want to scale the cluster after an application is submitted. In
> order to do this, I'm creating new workers and they are getting registered
> with master but issue I'm seeing is; running application does not use the
> newly added worker. Hence cannot add more resources to existing running
> application.
>
> Is there any other way or config to deal with this use-case ? How to make
> running application to ask for executors from newly issued worker node ?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Scaling-spark-cluster-for-a-running-application-tp23951.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Parquet problems

2015-07-22 Thread Michael Misiewicz
Hi Anders,

Did you ever get to the bottom of this issue? I'm encountering it too, but
only in "yarn-cluster" mode running on spark 1.4.0. I was thinking of
trying 1.4.1 today.

Michael

On Thu, Jun 25, 2015 at 5:52 AM, Anders Arpteg  wrote:

> Yes, both the driver and the executors. Works a little bit better with
> more space, but still a leak that will cause failure after a number of
> reads. There are about 700 different data sources that needs to be loaded,
> lots of data...
>
> tor 25 jun 2015 08:02 Sabarish Sasidharan 
> skrev:
>
>> Did you try increasing the perm gen for the driver?
>>
>> Regards
>> Sab
>> On 24-Jun-2015 4:40 pm, "Anders Arpteg"  wrote:
>>
>>> When reading large (and many) datasets with the Spark 1.4.0 DataFrames
>>> parquet reader (the org.apache.spark.sql.parquet format), the following
>>> exceptions are thrown:
>>>
>>> Exception in thread "task-result-getter-0"
>>> Exception: java.lang.OutOfMemoryError thrown from the
>>> UncaughtExceptionHandler in thread "task-result-getter-0"
>>> Exception in thread "task-result-getter-3" java.lang.OutOfMemoryError:
>>> PermGen space
>>> Exception in thread "task-result-getter-1" java.lang.OutOfMemoryError:
>>> PermGen space
>>> Exception in thread "task-result-getter-2" java.lang.OutOfMemoryError:
>>> PermGen space
>>>
>>> and many more like these from different threads. I've tried increasing
>>> the PermGen space using the -XX:MaxPermSize VM setting, but even after
>>> tripling the space, the same errors occur. I've also tried storing
>>> intermediate results, and am able to get the full job completed by running
>>> it multiple times and starting for the last successful intermediate result.
>>> There seems to be some memory leak in the parquet format. Any hints on how
>>> to fix this problem?
>>>
>>> Thanks,
>>> Anders
>>>
>>


Re: Mesos + Spark

2015-07-22 Thread Dean Wampler
This page, http://spark.apache.org/docs/latest/running-on-mesos.html,
covers many of these questions. If you submit a job with the option
"--supervise", it will be restarted if it fails.

You can use Chronos for scheduling. You can create a single streaming job
with a 10 minute batch interval, if that works for your every 10-min. need.

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
 (O'Reilly)
Typesafe 
@deanwampler 
http://polyglotprogramming.com

On Wed, Jul 22, 2015 at 3:53 AM, boci  wrote:

> Hi guys!
>
> I'm a new in mesos. I have two spark application (one streaming and one
> batch). I want to run both app in mesos cluster. Now for testing I want to
> run in docker container so I started a simple redjack/mesos-master, but I
> think a lot of think unclear for me (both mesos and spark-mesos).
>
> If I have a mesos cluster (for testing it will be some docker container) i
> need a separate machine (container) to run my spark job? Or can I submit
> the cluster and schedule (chronos or I dunno)?
> How can I run the streaming job? What happened if the "controller" died?
> Or if I call spark-submit with master=mesos my application started and I
> can forget? How can I run in every 10 min without submit in every 10 min?
> How can I run my streaming app in HA mode?
>
> Thanks
>
> b0c1
>
>
> --
> Skype: boci13, Hangout: boci.b...@gmail.com
>


Re: Spark-hive parquet schema evolution

2015-07-22 Thread Dean Wampler
While it's not recommended to overwrite files Hive thinks it understands,
you can add the column to Hive's metastore using an ALTER TABLE command
using HiveQL in the Hive shell or using HiveContext.sql():

ALTER TABLE mytable ADD COLUMNS col_name data_type

See
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-AlterTable/Partition/Column
for full details.

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
 (O'Reilly)
Typesafe 
@deanwampler 
http://polyglotprogramming.com

On Wed, Jul 22, 2015 at 4:36 AM, Cheng Lian  wrote:

>  Since Hive doesn’t support schema evolution, you’ll have to update the
> schema stored in metastore somehow. For example, you can create a new
> external table with the merged schema. Say you have a Hive table t1:
>
> CREATE TABLE t1 (c0 INT, c1 DOUBLE);
>
> By default, this table is stored in HDFS path
> hdfs://some-host:9000/user/hive/warehouse/t1. Now you append some Parquet
> data with an extra column c2 to the same directory:
>
> import org.apache.spark.sql.types._
> val path = "hdfs://some-host:9000/user/hive/warehouse/t1"val df1 = 
> sqlContext.range(10).select('id as 'c0, 'id cast DoubleType as 'c1, 'id cast 
> StringType as 'c2)
> df1.write.mode("append").parquet(path)
>
> Now you can create a new external table t2 like this:
>
> val df2 = sqlContext.read.option(
>  "
> mergeSchema", "true").parquet(path)
> df2.write.path(path).saveAsTable("t2")
>
> Since we specified a path above, the newly created t2 is an external
> table pointing to the original HDFS location. But the schema of t2 is the
> merged version.
>
> The drawback of this approach is that, t2 is actually a Spark SQL
> specific data source table rather than a genuine Hive table. This means, it
> can be accessed by Spark SQL only. We’re just using Hive metastore to help
> persisting metadata of the data source table. However, since you’re asking
> how to access the new table via Spark SQL CLI, this should work for you. We
> are working on making Parquet and ORC data source tables accessible via
> Hive in Spark 1.5.0.
>
> Cheng
>
> On 7/22/15 10:32 AM, Jerrick Hoang wrote:
>
>   Hi Lian,
>
>  Sorry I'm new to Spark so I did not express myself very clearly. I'm
> concerned about the situation when let's say I have a Parquet table some
> partitions and I add a new column A to parquet schema and write some data
> with the new schema to a new partition in the table. If i'm not mistaken,
> if I do a sqlContext.read.parquet(table_path).printSchema() it will print
> the correct schema with new column A. But if I do a 'describe table' from
> SparkSQLCLI I won't see the new column being added. I understand that this
> is because Hive doesn't support schema evolution. So what is the best way
> to support CLI queries in this situation? Do I need to manually alter the
> table everytime the underlying schema changes?
>
>  Thanks
>
> On Tue, Jul 21, 2015 at 4:37 PM, Cheng Lian  wrote:
>
>>  Hey Jerrick,
>>
>> What do you mean by "schema evolution with Hive metastore tables"? Hive
>> doesn't take schema evolution into account. Could you please give a
>> concrete use case? Are you trying to write Parquet data with extra columns
>> into an existing metastore Parquet table?
>>
>> Cheng
>>
>>
>> On 7/21/15 1:04 AM, Jerrick Hoang wrote:
>>
>> I'm new to Spark, any ideas would be much appreciated! Thanks
>>
>> On Sat, Jul 18, 2015 at 11:11 AM, Jerrick Hoang <
>> jerrickho...@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>>  I'm aware of the support for schema evolution via DataFrame API. Just
>>> wondering what would be the best way to go about dealing with schema
>>> evolution with Hive metastore tables. So, say I create a table via SparkSQL
>>> CLI, how would I deal with Parquet schema evolution?
>>>
>>>  Thanks,
>>> J
>>>
>>
>>
>>
>​
>


Re: Is spark suitable for real time query

2015-07-22 Thread Louis Hust
My code like below:
Map t11opt = new HashMap();
t11opt.put("url", DB_URL);
t11opt.put("dbtable", "t11");
DataFrame t11 = sqlContext.load("jdbc", t11opt);
t11.registerTempTable("t11");

...the same for t12, t21, t22



DataFrame t1 = t11.unionAll(t12);
t1.registerTempTable("t1");
DataFrame t2 = t21.unionAll(t22);
t2.registerTempTable("t2");
for (int i = 0; i < 10; i ++) {
System.out.println(new Date(System.currentTimeMillis()));
DataFrame crossjoin = sqlContext.sql("select txt from t1
join t2 on t1.id = t2.id");
crossjoin.show();
System.out.println(new Date(System.currentTimeMillis()));
}

Where t11,t12, t21,t22 are all table dataframe load from jdbc  of mysql
database which is at local with the spark job.

But each loop execute about 3 seconds. i do not know why cost so many time?




2015-07-22 19:52 GMT+08:00 Robin East :

> Here’s an example using spark-shell on my laptop:
>
> sc.textFile("LICENSE").filter(_ contains "Spark").count
>
> This takes less than a second the first time I run it and is instantaneous
> on every subsequent run.
>
> What code are you running?
>
>
> On 22 Jul 2015, at 12:34, Louis Hust  wrote:
>
> I do a simple test using spark in standalone mode(not cluster),
>  and found a simple action take a few seconds, the data size is small,
> just few rows.
> So each spark job will cost some time for init or prepare work no matter
> what the job is?
> I mean if the basic framework of spark job will cost seconds?
>
> 2015-07-22 19:17 GMT+08:00 Robin East :
>
>> Real-time is, of course, relative but you’ve mentioned microsecond level.
>> Spark is designed to process large amounts of data in a distributed
>> fashion. No distributed system I know of could give any kind of guarantees
>> at the microsecond level.
>>
>> Robin
>>
>> > On 22 Jul 2015, at 11:14, Louis Hust  wrote:
>> >
>> > Hi, all
>> >
>> > I am using spark jar in standalone mode, fetch data from different
>> mysql instance and do some action, but i found the time is at second level.
>> >
>> > So i want to know if spark job is suitable for real time query which at
>> microseconds?
>>
>>
>
>


spark.files.userClassPathFirst=true Return Error - Please help

2015-07-22 Thread Ashish Soni
Hi All ,

I am getting below error when i use the --conf
spark.files.userClassPathFirst=true parameter

Job aborted due to stage failure: Task 3 in stage 0.0 failed 4 times, most
recent failure: Lost task 3.3 in stage 0.0 (TID 32, 10.200.37.161):
java.lang.ClassCastException: cannot assign instance of scala.None$ to
field org.apache.spark.scheduler.Task.metrics of type scala.Option in
instance of org.apache.spark.scheduler.ResultTask

I am using as below

spark-submit --conf spark.files.userClassPathFirst=true --driver-memory 6g
--executor-memory 12g --executor-cores 4   --class
com.ericsson.engine.RateDriver --master local
/home/spark/workspace/simplerating/target/simplerating-0.0.1-SNAPSHOT.jar
spark://eSPARKMASTER:7077 hdfs://enamenode/user/spark

thanks


spark.deploy.spreadOut core allocation

2015-07-22 Thread Srikanth
Hello,

I've set spark.deploy.spreadOut=false in spark-env.sh.

> export SPARK_MASTER_OPTS="-Dspark.deploy.defaultCores=4
> -Dspark.deploy.spreadOut=false"


There are 3 workers each with 4 cores. Spark-shell was started with noof
cores = 6.
Spark UI show that one executor was used with 6 cores.

Is this a bug? This is with Spark 1.4.

[image: Inline image 1]

Srikanth


problems running Spark on a firewalled remote YARN cluster via SOCKS proxy

2015-07-22 Thread rok
I am trying to run Spark applications with the driver running locally and
interacting with a firewalled remote cluster via a SOCKS proxy. 

I have to modify the hadoop configuration on the *local machine* to try to
make this work, adding 


   hadoop.rpc.socket.factory.class.default
   org.apache.hadoop.net.SocksSocketFactory


   hadoop.socks.server
   localhost:9998


and on the *remote cluster* side


hadoop.rpc.socket.factory.class.default
org.apache.hadoop.net.StandardSocketFactory
true


With this setup, and running "ssh -D 9998 gateway.host" to start the proxy
connection, MapReduce jobs started on the local machine execute fine on the
remote cluster. However, trying to launch a Spark job fails with the nodes
of the cluster apparently unable to communicate with one another: 

java.io.IOException: Failed on local exception: java.net.SocketException:
Connection refused; Host Details : local host is: "node3/10.211.55.103";
destination host is: "node1":8030;

Looking at the packets being sent to node1 from node3, it's clear that no
requests are made on port 8030, hinting that the connection is somehow being
proxied. 

Is it possible that the Spark job is not honoring the socket.factory
settings on the *cluster* side for some reason? 

Note that  Spark JIRA 5004
   addresses a similar
problem, though it looks like they are actually not the same (since in that
case it sounds like a standalone cluster is being used). 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/problems-running-Spark-on-a-firewalled-remote-YARN-cluster-via-SOCKS-proxy-tp23955.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Parquet problems

2015-07-22 Thread Cheng Lian
How many columns are there in these Parquet files? Could you load a 
small portion of the original large dataset successfully?


Cheng

On 6/25/15 5:52 PM, Anders Arpteg wrote:


Yes, both the driver and the executors. Works a little bit better with 
more space, but still a leak that will cause failure after a number of 
reads. There are about 700 different data sources that needs to be 
loaded, lots of data...



tor 25 jun 2015 08:02 Sabarish Sasidharan 
> skrev:


Did you try increasing the perm gen for the driver?

Regards
Sab

On 24-Jun-2015 4:40 pm, "Anders Arpteg" mailto:arp...@spotify.com>> wrote:

When reading large (and many) datasets with the Spark 1.4.0
DataFrames parquet reader (the org.apache.spark.sql.parquet
format), the following exceptions are thrown:

Exception in thread "sk-result-getter-0"
Exception: java.lang.OutOfMemoryError thrown from the
UncaughtExceptionHandler in thread "task-result-getter-0"
Exception in thread "task-result-getter-3"
java.lang.OutOfMemoryError: PermGen space
Exception in thread "task-result-getter-1"
java.lang.OutOfMemoryError: PermGen space
Exception in thread "task-result-getter-2"
java.lang.OutOfMemoryError: PermGen space

and many more like these from different threads. I've tried
increasing the PermGen space using the -XX:MaxPermSize VM
setting, but even after tripling the space, the same errors
occur. I've also tried storing intermediate results, and am
able to get the full job completed by running it multiple
times and starting for the last successful intermediate
result. There seems to be some memory leak in the parquet
format. Any hints on how to fix this problem?

Thanks,
Anders





Re: Spark-hive parquet schema evolution

2015-07-22 Thread Cheng Lian
Yeah, the benefit of `saveAsTable` is that you don't need to deal with 
schema explicitly, while the benefit of ALTER TABLE is you still have a 
standard vanilla Hive table.


Cheng

On 7/22/15 11:00 PM, Dean Wampler wrote:
While it's not recommended to overwrite files Hive thinks it 
understands, you can add the column to Hive's metastore using an ALTER 
TABLE command using HiveQL in the Hive shell or using HiveContext.sql():


ALTER TABLE mytable ADD COLUMNS col_name data_type

See 
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-AlterTable/Partition/Column 
for full details.


dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition 
 (O'Reilly)

Typesafe 
@deanwampler 
http://polyglotprogramming.com

On Wed, Jul 22, 2015 at 4:36 AM, Cheng Lian > wrote:


Since Hive doesn’t support schema evolution, you’ll have to update
the schema stored in metastore somehow. For example, you can
create a new external table with the merged schema. Say you have a
Hive table |t1|:

|CREATE TABLE t1 (c0 INT, c1 DOUBLE); |

By default, this table is stored in HDFS path
|hdfs://some-host:9000/user/hive/warehouse/t1|. Now you append
some Parquet data with an extra column |c2| to the same directory:

|import org.apache.spark.sql.types._ val path =
"hdfs://some-host:9000/user/hive/warehouse/t1" val df1 =
sqlContext.range(10).select('id as 'c0, 'id cast DoubleType as
'c1, 'id cast StringType as 'c2)
df1.write.mode("append").parquet(path) |

Now you can create a new external table |t2| like this:

|val df2 = sqlContext.read.option(" mergeSchema",
"true").parquet(path) df2.write.path(path).saveAsTable("t2") |

Since we specified a path above, the newly created |t2| is an
external table pointing to the original HDFS location. But the
schema of |t2| is the merged version.

The drawback of this approach is that, |t2| is actually a Spark
SQL specific data source table rather than a genuine Hive table.
This means, it can be accessed by Spark SQL only. We’re just using
Hive metastore to help persisting metadata of the data source
table. However, since you’re asking how to access the new table
via Spark SQL CLI, this should work for you. We are working on
making Parquet and ORC data source tables accessible via Hive in
Spark 1.5.0.

Cheng

On 7/22/15 10:32 AM, Jerrick Hoang wrote:


Hi Lian,

Sorry I'm new to Spark so I did not express myself very clearly.
I'm concerned about the situation when let's say I have a Parquet
table some partitions and I add a new column A to parquet schema
and write some data with the new schema to a new partition in the
table. If i'm not mistaken, if I do a
sqlContext.read.parquet(table_path).printSchema() it will print
the correct schema with new column A. But if I do a 'describe
table' from SparkSQLCLI I won't see the new column being added. I
understand that this is because Hive doesn't support schema
evolution. So what is the best way to support CLI queries in this
situation? Do I need to manually alter the table everytime the
underlying schema changes?

Thanks

On Tue, Jul 21, 2015 at 4:37 PM, Cheng Lian
mailto:lian.cs@gmail.com>> wrote:

Hey Jerrick,

What do you mean by "schema evolution with Hive metastore
tables"? Hive doesn't take schema evolution into account.
Could you please give a concrete use case? Are you trying to
write Parquet data with extra columns into an existing
metastore Parquet table?

Cheng


On 7/21/15 1:04 AM, Jerrick Hoang wrote:

I'm new to Spark, any ideas would be much appreciated! Thanks

On Sat, Jul 18, 2015 at 11:11 AM, Jerrick Hoang
mailto:jerrickho...@gmail.com>> wrote:

Hi all,

I'm aware of the support for schema evolution via
DataFrame API. Just wondering what would be the best way
to go about dealing with schema evolution with Hive
metastore tables. So, say I create a table via SparkSQL
CLI, how would I deal with Parquet schema evolution?

Thanks,
J






​






Re: Parquet problems

2015-07-22 Thread Anders Arpteg
No, never really resolved the problem, except by increasing the permgem
space which only partially solved it. Still have to restart the job
multiple times so make the whole job complete (it stores intermediate
results).

The parquet data sources have about 70 columns, and yes Cheng, it works
fine when only loading a small sample of the data.

Thankful for any hints,
Anders

On Wed, Jul 22, 2015 at 5:29 PM Cheng Lian  wrote:

>  How many columns are there in these Parquet files? Could you load a small
> portion of the original large dataset successfully?
>
> Cheng
>
>
> On 6/25/15 5:52 PM, Anders Arpteg wrote:
>
> Yes, both the driver and the executors. Works a little bit better with
> more space, but still a leak that will cause failure after a number of
> reads. There are about 700 different data sources that needs to be loaded,
> lots of data...
>
>  tor 25 jun 2015 08:02 Sabarish Sasidharan <
> sabarish.sasidha...@manthan.com> skrev:
>
> Did you try increasing the perm gen for the driver?
>>
>> Regards
>> Sab
>>
> On 24-Jun-2015 4:40 pm, "Anders Arpteg"  wrote:
>>
> When reading large (and many) datasets with the Spark 1.4.0 DataFrames
>>> parquet reader (the org.apache.spark.sql.parquet format), the following
>>> exceptions are thrown:
>>>
>>>  Exception in thread "sk-result-getter-0"
>>>
>> Exception: java.lang.OutOfMemoryError thrown from the
>>> UncaughtExceptionHandler in thread "task-result-getter-0"
>>> Exception in thread "task-result-getter-3" java.lang.OutOfMemoryError:
>>> PermGen space
>>> Exception in thread "task-result-getter-1" java.lang.OutOfMemoryError:
>>> PermGen space
>>> Exception in thread "task-result-getter-2" java.lang.OutOfMemoryError:
>>> PermGen space
>>>
>>
>>>  and many more like these from different threads. I've tried increasing
>>> the PermGen space using the -XX:MaxPermSize VM setting, but even after
>>> tripling the space, the same errors occur. I've also tried storing
>>> intermediate results, and am able to get the full job completed by running
>>> it multiple times and starting for the last successful intermediate result.
>>> There seems to be some memory leak in the parquet format. Any hints on how
>>> to fix this problem?
>>>
>>>  Thanks,
>>> Anders
>>>
>>


Re: user threads in executors

2015-07-22 Thread Shushant Arora
Thanks !

I am using spark streaming 1.3 , And if some post fails because of any
reason, I will store the offset of that message in another kafka topic. I
want to read these offsets in another spark job  and from them the original
kafka topic's messages based on these offsets-
 So is it possible in spark job to get kafka messages based on random
offsets ? Or is there any better alternative to handle failure of post
request?

On Wed, Jul 22, 2015 at 11:31 AM, Tathagata Das  wrote:

> Yes, you could unroll from the iterator in batch of 100-200 and then post
> them in multiple rounds.
> If you are using the Kafka receiver based approach (not Direct), then the
> raw Kafka data is stored in the executor memory. If you are using Direct
> Kafka, then it is read from Kafka directly at the time of filtering.
>
> TD
>
> On Tue, Jul 21, 2015 at 9:34 PM, Shushant Arora  > wrote:
>
>> I can post multiple items at a time.
>>
>> Data is being read from kafka and filtered after that its posted . Does 
>> foreachPartition
>> load complete partition in memory or use an iterator of batch underhood? If
>> compete batch is not loaded will using custim size of 100-200 request in
>> one batch and post will help instead of whole partition ?
>>
>> On Wed, Jul 22, 2015 at 12:18 AM, Tathagata Das 
>> wrote:
>>
>>> If you can post multiple items at a time, then use foreachPartition to
>>> post the whole partition in a single request.
>>>
>>> On Tue, Jul 21, 2015 at 9:35 AM, Richard Marscher <
>>> rmarsc...@localytics.com> wrote:
>>>
 You can certainly create threads in a map transformation. We do this to
 do concurrent DB lookups during one stage for example. I would recommend,
 however, that you switch to mapPartitions from map as this allows you to
 create a fixed size thread pool to share across items on a partition as
 opposed to spawning a future per record in the RDD for example.

 On Tue, Jul 21, 2015 at 4:11 AM, Shushant Arora <
 shushantaror...@gmail.com> wrote:

> Hi
>
> Can I create user threads in executors.
> I have a streaming app where after processing I have a requirement to
> push events to external system . Each post request costs ~90-100 ms.
>
> To make post parllel, I can not use same thread because that is
> limited by no of cores available in system , can I useuser therads in 
> spark
> App? I tried to create 2 thredas in a map tasks and it worked.
>
> Is there any upper limit on no of user threds in spark executor ? Is
> it a good idea to create user threads in spark map task?
>
> Thanks
>
>


 --
 *Richard Marscher*
 Software Engineer
 Localytics
 Localytics.com  | Our Blog
  | Twitter 
  | Facebook  | LinkedIn
 

>>>
>>>
>>
>


Re: user threads in executors

2015-07-22 Thread Cody Koeninger
Yes, look at KafkaUtils.createRDD

On Wed, Jul 22, 2015 at 11:17 AM, Shushant Arora 
wrote:

> Thanks !
>
> I am using spark streaming 1.3 , And if some post fails because of any
> reason, I will store the offset of that message in another kafka topic. I
> want to read these offsets in another spark job  and from them the original
> kafka topic's messages based on these offsets-
>  So is it possible in spark job to get kafka messages based on random
> offsets ? Or is there any better alternative to handle failure of post
> request?
>
> On Wed, Jul 22, 2015 at 11:31 AM, Tathagata Das 
> wrote:
>
>> Yes, you could unroll from the iterator in batch of 100-200 and then post
>> them in multiple rounds.
>> If you are using the Kafka receiver based approach (not Direct), then the
>> raw Kafka data is stored in the executor memory. If you are using Direct
>> Kafka, then it is read from Kafka directly at the time of filtering.
>>
>> TD
>>
>> On Tue, Jul 21, 2015 at 9:34 PM, Shushant Arora <
>> shushantaror...@gmail.com> wrote:
>>
>>> I can post multiple items at a time.
>>>
>>> Data is being read from kafka and filtered after that its posted . Does 
>>> foreachPartition
>>> load complete partition in memory or use an iterator of batch underhood? If
>>> compete batch is not loaded will using custim size of 100-200 request in
>>> one batch and post will help instead of whole partition ?
>>>
>>> On Wed, Jul 22, 2015 at 12:18 AM, Tathagata Das 
>>> wrote:
>>>
 If you can post multiple items at a time, then use foreachPartition to
 post the whole partition in a single request.

 On Tue, Jul 21, 2015 at 9:35 AM, Richard Marscher <
 rmarsc...@localytics.com> wrote:

> You can certainly create threads in a map transformation. We do this
> to do concurrent DB lookups during one stage for example. I would
> recommend, however, that you switch to mapPartitions from map as this
> allows you to create a fixed size thread pool to share across items on a
> partition as opposed to spawning a future per record in the RDD for 
> example.
>
> On Tue, Jul 21, 2015 at 4:11 AM, Shushant Arora <
> shushantaror...@gmail.com> wrote:
>
>> Hi
>>
>> Can I create user threads in executors.
>> I have a streaming app where after processing I have a requirement to
>> push events to external system . Each post request costs ~90-100 ms.
>>
>> To make post parllel, I can not use same thread because that is
>> limited by no of cores available in system , can I useuser therads in 
>> spark
>> App? I tried to create 2 thredas in a map tasks and it worked.
>>
>> Is there any upper limit on no of user threds in spark executor ? Is
>> it a good idea to create user threads in spark map task?
>>
>> Thanks
>>
>>
>
>
> --
> *Richard Marscher*
> Software Engineer
> Localytics
> Localytics.com  | Our Blog
>  | Twitter 
>  | Facebook  | LinkedIn
> 
>


>>>
>>
>


Help accessing protected S3

2015-07-22 Thread Greg Anderson
I have a protected s3 bucket that requires a certain IAM role to access.  When 
I start my cluster using the spark-ec2 script, everything works just fine until 
I try to read from that part of s3.  Here is the command I am using:

./spark-ec2 -k KEY -i KEY_FILE.pem --additional-security-group=IAM_ROLE 
--copy-aws-credentials --zone=us-east-1e -t m1.large --worker-instances=3 
--hadoop-major-version=2.7.1 --user-data=test.sh launch my-cluster

I have read through this article: 
http://apache-spark-user-list.1001560.n3.nabble.com/S3-Bucket-Access-td16303.html

The problem seems to be very similar, but I wasn't able to find a solution in 
it for me.  I'm not sure what else to provide here, just let me know what you 
need.  Thanks in advance!
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark spark.shuffle.memoryFraction has no affect

2015-07-22 Thread Walid Baroni
Hi Andrew

I tried many different combinations, but still no change in the amount of 
shuffle bytes spilled to disk by checking the UI. I made sure the configuration 
have been applied by checking Spark UI/Environment. I only see changes in 
shuffle bytes spilled if I disable spark.shuffle.spill


> On Jul 22, 2015, at 3:15 AM, Andrew Or  wrote:
> 
> Hi,
> 
> The setting of 0.2 / 0.6 looks reasonable to me. Since you are not using 
> caching at all, have you tried trying something more extreme, like 0.1 / 0.9? 
> Since disabling spark.shuffle.spill didn't cause an OOM this setting should 
> be fine. Also, one thing you could do is to verify the shuffle bytes spilled 
> on the UI before and after the change.
> 
> Let me know if that helped.
> -Andrew
> 
> 2015-07-21 13:50 GMT-07:00 wdbaruni  >:
> Hi
> I am testing Spark on Amazon EMR using Python and the basic wordcount
> example shipped with Spark.
> 
> After running the application, I realized that in Stage 0 reduceByKey(add),
> around 2.5GB shuffle is spilled to memory and 4GB shuffle is spilled to
> disk. Since in the wordcount example I am not caching or persisting any
> data, so I thought I can increase the performance of this application by
> giving more shuffle memoryFraction. So, in spark-defaults.conf, I added the
> following:
> 
> spark.storage.memoryFraction0.2
> spark.shuffle.memoryFraction0.6
> 
> However, I am still getting the same performance and the same amount of
> shuffle data is being spilled to disk and memory. I validated that Spark is
> reading these configurations using Spark UI/Environment and I can see my
> changes. Moreover, I tried setting spark.shuffle.spill to false and I got
> the performance I am looking for and all shuffle data was spilled to memory
> only.
> 
> So, what am I getting wrong here and why not the extra shuffle memory
> fraction is not utilized?
> 
> *My environment:*
> Amazon EMR with Spark 1.3.1 running using -x argument
> 1 Master node: m3.xlarge
> 3 Core nodes: m3.xlarge
> Application: wordcount.py
> Input: 10 .gz files 90MB each (~350MB unarchived) stored in S3
> 
> *Submit command:*
> /home/hadoop/spark/bin/spark-submit --deploy-mode client /mnt/wordcount.py
> s3n://
> 
> *spark-defaults.conf:*
> spark.eventLog.enabled  false
> spark.executor.extraJavaOptions -verbose:gc -XX:+PrintGCDetails
> -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC
> -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70
> spark.driver.extraJavaOptions   -Dspark.driver.log.level=INFO
> spark.masteryarn
> spark.executor.instances3
> spark.executor.cores4
> spark.executor.memory   9404M
> spark.default.parallelism   12
> spark.eventLog.enabled  true
> spark.eventLog.dir  hdfs:///spark-logs/
> spark.storage.memoryFraction0.2
> spark.shuffle.memoryFraction0.6
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-spark-shuffle-memoryFraction-has-no-affect-tp23944.html
>  
> 
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> 
> For additional commands, e-mail: user-h...@spark.apache.org 
> 
> 
> 



Re: Parquet problems

2015-07-22 Thread Jerry Lam
Hi guys,

I noticed that too. Anders, can you confirm that it works on Spark 1.5
snapshot? This is what I tried at the end. It seems it is 1.4 issue.

Best Regards,

Jerry

On Wed, Jul 22, 2015 at 11:46 AM, Anders Arpteg  wrote:

> No, never really resolved the problem, except by increasing the permgem
> space which only partially solved it. Still have to restart the job
> multiple times so make the whole job complete (it stores intermediate
> results).
>
> The parquet data sources have about 70 columns, and yes Cheng, it works
> fine when only loading a small sample of the data.
>
> Thankful for any hints,
> Anders
>
> On Wed, Jul 22, 2015 at 5:29 PM Cheng Lian  wrote:
>
>>  How many columns are there in these Parquet files? Could you load a
>> small portion of the original large dataset successfully?
>>
>> Cheng
>>
>>
>> On 6/25/15 5:52 PM, Anders Arpteg wrote:
>>
>> Yes, both the driver and the executors. Works a little bit better with
>> more space, but still a leak that will cause failure after a number of
>> reads. There are about 700 different data sources that needs to be loaded,
>> lots of data...
>>
>>  tor 25 jun 2015 08:02 Sabarish Sasidharan <
>> sabarish.sasidha...@manthan.com> skrev:
>>
>> Did you try increasing the perm gen for the driver?
>>>
>>> Regards
>>> Sab
>>>
>> On 24-Jun-2015 4:40 pm, "Anders Arpteg"  wrote:
>>>
>> When reading large (and many) datasets with the Spark 1.4.0 DataFrames
 parquet reader (the org.apache.spark.sql.parquet format), the following
 exceptions are thrown:

  Exception in thread "sk-result-getter-0"

>>> Exception: java.lang.OutOfMemoryError thrown from the
 UncaughtExceptionHandler in thread "task-result-getter-0"
 Exception in thread "task-result-getter-3" java.lang.OutOfMemoryError:
 PermGen space
 Exception in thread "task-result-getter-1" java.lang.OutOfMemoryError:
 PermGen space
 Exception in thread "task-result-getter-2" java.lang.OutOfMemoryError:
 PermGen space

>>>
  and many more like these from different threads. I've tried
 increasing the PermGen space using the -XX:MaxPermSize VM setting, but even
 after tripling the space, the same errors occur. I've also tried storing
 intermediate results, and am able to get the full job completed by running
 it multiple times and starting for the last successful intermediate result.
 There seems to be some memory leak in the parquet format. Any hints on how
 to fix this problem?

  Thanks,
 Anders

>>>


Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

2015-07-22 Thread Stahlman, Jonathan
Hello again,

In trying to understand the caching of intermediate RDDs by ALS, I looked into 
the source code and found what may be a bug.  Looking here:

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala#L230

you see that ALS.train() is being called with finalRDDStorageLevel = 
StorageLevel.NONE, which I would understand to mean that the intermediate RDDs 
will not be persisted.  Looking here:

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L631

unpersist() is only being called on the intermediate RDDs (all the *Blocks RDDs 
listed in my first post) if finalRDDStorageLevel != StorageLevel.NONE.

This doesn’t make sense to me – I would expect the RDDs to be removed from the 
cache if finalRDDStorageLevel == StorageLevel.NONE, not the other way around.

Jonathan


From: , Stahlman Jonathan 
mailto:jonathan.stahl...@capitalone.com>>
Date: Thursday, July 16, 2015 at 2:18 PM
To: "user@spark.apache.org" 
mailto:user@spark.apache.org>>
Subject: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

Hello all,

I am running the Spark recommendation algorithm in MLlib and I have been 
studying its output with various model configurations.  Ideally I would like to 
be able to run one job that trains the recommendation model with many different 
configurations to try to optimize for performance.  A sample code in python is 
copied below.

The issue I have is that each new model which is trained caches a set of RDDs 
and eventually the executors run out of memory.  Is there any way in Pyspark to 
unpersist() these RDDs after each iteration?  The names of the RDDs which I 
gather from the UI is:

itemInBlocks
itemOutBlocks
Products
ratingBlocks
userInBlocks
userOutBlocks
users

I am using Spark 1.3.  Thank you for any help!

Regards,
Jonathan




  data_train, data_cv, data_test = data.randomSplit([99,1,1], 2)
  functions = [rating] #defined elsewhere
  ranks = [10,20]
  iterations = [10,20]
  lambdas = [0.01,0.1]
  alphas  = [1.0,50.0]

  results = []
  for ratingFunction, rank, numIterations, m_lambda, m_alpha in 
itertools.product( functions, ranks, iterations, lambdas, alphas ):
#train model
ratings_train = data_train.map(lambda l: Rating( l.user, l.product, 
ratingFunction(l) ) )
model   = ALS.trainImplicit( ratings_train, rank, numIterations, 
lambda_=float(m_lambda), alpha=float(m_alpha) )

#test performance on CV data
ratings_cv = data_cv.map(lambda l: Rating( l.uesr, l.product, 
ratingFunction(l) ) )
auc = areaUnderCurve( ratings_cv, model.predictAll )

#save results
result = ",".join(str(l) for l in 
[ratingFunction.__name__,rank,numIterations,m_lambda,m_alpha,auc])
results.append(result)


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

2015-07-22 Thread Burak Yavuz
Hi Jonathan,

I believe calling persist with StorageLevel.NONE doesn't do anything.
That's why the unpersist has an if statement before it.
Could you give more information about your setup please? Number of cores,
memory, number of partitions of ratings_train?

Thanks,
Burak

On Wed, Jul 22, 2015 at 10:38 AM, Stahlman, Jonathan <
jonathan.stahl...@capitalone.com> wrote:

> Hello again,
>
> In trying to understand the caching of intermediate RDDs by ALS, I looked
> into the source code and found what may be a bug.  Looking here:
>
>
> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala#L230
>
> you see that ALS.train() is being called with finalRDDStorageLevel =
> StorageLevel.NONE, which I would understand to mean that the intermediate
> RDDs will not be persisted.  Looking here:
>
>
> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L631
>
> unpersist() is only being called on the intermediate RDDs (all the *Blocks
> RDDs listed in my first post) if finalRDDStorageLevel != StorageLevel.NONE.
>
>
> This doesn’t make sense to me – I would expect the RDDs to be removed from
> the cache if finalRDDStorageLevel == StorageLevel.NONE, not the other way
> around.
>
> Jonathan
>
>
> From: , Stahlman Jonathan 
> Date: Thursday, July 16, 2015 at 2:18 PM
> To: "user@spark.apache.org" 
> Subject: How to unpersist RDDs generated by ALS/MatrixFactorizationModel
>
> Hello all,
>
> I am running the Spark recommendation algorithm in MLlib and I have been
> studying its output with various model configurations.  Ideally I would
> like to be able to run one job that trains the recommendation model with
> many different configurations to try to optimize for performance.  A sample
> code in python is copied below.
>
> The issue I have is that each new model which is trained caches a set of
> RDDs and eventually the executors run out of memory.  Is there any way in
> Pyspark to unpersist() these RDDs after each iteration?  The names of the
> RDDs which I gather from the UI is:
>
> itemInBlocks
> itemOutBlocks
> Products
> ratingBlocks
> userInBlocks
> userOutBlocks
> users
>
> I am using Spark 1.3.  Thank you for any help!
>
> Regards,
> Jonathan
>
>
>
>
>   data_train, data_cv, data_test = data.randomSplit([99,1,1], 2)
>   functions = [rating] #defined elsewhere
>   ranks = [10,20]
>   iterations = [10,20]
>   lambdas = [0.01,0.1]
>   alphas  = [1.0,50.0]
>
>   results = []
>   for ratingFunction, rank, numIterations, m_lambda, m_alpha in
> itertools.product( functions, ranks, iterations, lambdas, alphas ):
> #train model
> ratings_train = data_train.map(lambda l: Rating( l.user, l.product,
> ratingFunction(l) ) )
> model   = ALS.trainImplicit( ratings_train, rank, numIterations,
> lambda_=float(m_lambda), alpha=float(m_alpha) )
>
> #test performance on CV data
> ratings_cv = data_cv.map(lambda l: Rating( l.uesr, l.product,
> ratingFunction(l) ) )
> auc = areaUnderCurve( ratings_cv, model.predictAll )
>
> #save results
> result = ",".join(str(l) for l in
> [ratingFunction.__name__,rank,numIterations,m_lambda,m_alpha,auc])
> results.append(result)
>
> --
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>


Re: Parquet problems

2015-07-22 Thread Michael Misiewicz
For what it's worth, my data set has around 85 columns in Parquet format as
well. I have tried bumping the permgen up to 512m but I'm still getting
errors in the driver thread.

On Wed, Jul 22, 2015 at 1:20 PM, Jerry Lam  wrote:

> Hi guys,
>
> I noticed that too. Anders, can you confirm that it works on Spark 1.5
> snapshot? This is what I tried at the end. It seems it is 1.4 issue.
>
> Best Regards,
>
> Jerry
>
> On Wed, Jul 22, 2015 at 11:46 AM, Anders Arpteg 
> wrote:
>
>> No, never really resolved the problem, except by increasing the permgem
>> space which only partially solved it. Still have to restart the job
>> multiple times so make the whole job complete (it stores intermediate
>> results).
>>
>> The parquet data sources have about 70 columns, and yes Cheng, it works
>> fine when only loading a small sample of the data.
>>
>> Thankful for any hints,
>> Anders
>>
>> On Wed, Jul 22, 2015 at 5:29 PM Cheng Lian  wrote:
>>
>>>  How many columns are there in these Parquet files? Could you load a
>>> small portion of the original large dataset successfully?
>>>
>>> Cheng
>>>
>>>
>>> On 6/25/15 5:52 PM, Anders Arpteg wrote:
>>>
>>> Yes, both the driver and the executors. Works a little bit better with
>>> more space, but still a leak that will cause failure after a number of
>>> reads. There are about 700 different data sources that needs to be loaded,
>>> lots of data...
>>>
>>>  tor 25 jun 2015 08:02 Sabarish Sasidharan <
>>> sabarish.sasidha...@manthan.com> skrev:
>>>
>>> Did you try increasing the perm gen for the driver?

 Regards
 Sab

>>> On 24-Jun-2015 4:40 pm, "Anders Arpteg"  wrote:

>>> When reading large (and many) datasets with the Spark 1.4.0 DataFrames
> parquet reader (the org.apache.spark.sql.parquet format), the following
> exceptions are thrown:
>
>  Exception in thread "sk-result-getter-0"
>
 Exception: java.lang.OutOfMemoryError thrown from the
> UncaughtExceptionHandler in thread "task-result-getter-0"
> Exception in thread "task-result-getter-3" java.lang.OutOfMemoryError:
> PermGen space
> Exception in thread "task-result-getter-1" java.lang.OutOfMemoryError:
> PermGen space
> Exception in thread "task-result-getter-2" java.lang.OutOfMemoryError:
> PermGen space
>

>  and many more like these from different threads. I've tried
> increasing the PermGen space using the -XX:MaxPermSize VM setting, but 
> even
> after tripling the space, the same errors occur. I've also tried storing
> intermediate results, and am able to get the full job completed by running
> it multiple times and starting for the last successful intermediate 
> result.
> There seems to be some memory leak in the parquet format. Any hints on how
> to fix this problem?
>
>  Thanks,
> Anders
>

>


How to keep RDDs in memory between two different batch jobs?

2015-07-22 Thread swetha
Hi,

We have a requirement wherein we need to keep RDDs in memory between Spark
batch processing that happens every one hour. The idea here is to have RDDs
that have active user sessions in memory between two jobs so that once a job
processing is  done and another job is run after an hour the RDDs with
active sessions are still available for joining with those in the current
job. So, what do we need to keep the data in memory in between two batch
jobs? Can we use Tachyon?

Thanks,
Swetha



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-keep-RDDs-in-memory-between-two-different-batch-jobs-tp23957.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

2015-07-22 Thread Stahlman, Jonathan
Hi Burak,

Looking at the source code, the intermediate RDDs used in ALS.train() are 
persisted during the computation using intermediateRDDStorageLevel (default 
value is StorageLevel.MEMORY_AND_DISK) - see 
here,
 
here,
 and 
here.
  At the end of the ALS calculation, these RDDs are no longer needed nor 
returned, so I would assume the logical choice would be to unpersist() these 
RDDs.  The strategy in the code seems to be set by finalRDDStorageLevel, which 
for some reason only calls unpersist() on the intermediate RDDs if  
finalRDDStorageLevel != StorageLevel.NONE, which seems counter-intuitive to me.

Jonathan

From: Burak Yavuz mailto:brk...@gmail.com>>
Date: Wednesday, July 22, 2015 at 10:47 AM
To: Stahlman Jonathan 
mailto:jonathan.stahl...@capitalone.com>>
Cc: "user@spark.apache.org" 
mailto:user@spark.apache.org>>
Subject: Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

Hi Jonathan,

I believe calling persist with StorageLevel.NONE doesn't do anything. That's 
why the unpersist has an if statement before it.
Could you give more information about your setup please? Number of cores, 
memory, number of partitions of ratings_train?

Thanks,
Burak

On Wed, Jul 22, 2015 at 10:38 AM, Stahlman, Jonathan 
mailto:jonathan.stahl...@capitalone.com>> 
wrote:
Hello again,

In trying to understand the caching of intermediate RDDs by ALS, I looked into 
the source code and found what may be a bug.  Looking here:

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala#L230

you see that ALS.train() is being called with finalRDDStorageLevel = 
StorageLevel.NONE, which I would understand to mean that the intermediate RDDs 
will not be persisted.  Looking here:

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L631

unpersist() is only being called on the intermediate RDDs (all the *Blocks RDDs 
listed in my first post) if finalRDDStorageLevel != StorageLevel.NONE.

This doesn’t make sense to me – I would expect the RDDs to be removed from the 
cache if finalRDDStorageLevel == StorageLevel.NONE, not the other way around.

Jonathan


From: , Stahlman Jonathan 
mailto:jonathan.stahl...@capitalone.com>>
Date: Thursday, July 16, 2015 at 2:18 PM
To: "user@spark.apache.org" 
mailto:user@spark.apache.org>>
Subject: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

Hello all,

I am running the Spark recommendation algorithm in MLlib and I have been 
studying its output with various model configurations.  Ideally I would like to 
be able to run one job that trains the recommendation model with many different 
configurations to try to optimize for performance.  A sample code in python is 
copied below.

The issue I have is that each new model which is trained caches a set of RDDs 
and eventually the executors run out of memory.  Is there any way in Pyspark to 
unpersist() these RDDs after each iteration?  The names of the RDDs which I 
gather from the UI is:

itemInBlocks
itemOutBlocks
Products
ratingBlocks
userInBlocks
userOutBlocks
users

I am using Spark 1.3.  Thank you for any help!

Regards,
Jonathan




  data_train, data_cv, data_test = data.randomSplit([99,1,1], 2)
  functions = [rating] #defined elsewhere
  ranks = [10,20]
  iterations = [10,20]
  lambdas = [0.01,0.1]
  alphas  = [1.0,50.0]

  results = []
  for ratingFunction, rank, numIterations, m_lambda, m_alpha in 
itertools.product( functions, ranks, iterations, lambdas, alphas ):
#train model
ratings_train = data_train.map(lambda l: Rating( l.user, l.product, 
ratingFunction(l) ) )
model   = ALS.trainImplicit( ratings_train, rank, numIterations, 
lambda_=float(m_lambda), alpha=float(m_alpha) )

#test performance on CV data
ratings_cv = data_cv.map(lambda l: Rating( l.uesr, l.product, 
ratingFunction(l) ) )
auc = areaUnderCurve( ratings_cv, model.predictAll )

#save results
result = ",".join(str(l) for l in 
[ratingFunction.__name__,rank,numIterations,m_lambda,m_alpha,auc])
results.append(result)



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, d

Performance issue with Spak's foreachpartition method

2015-07-22 Thread diplomatic Guru
Hello all,

We are having a major performance issue with the Spark, which is holding us
from going live.

We have a job that carries out computation on log files and write the
results into Oracle DB.

The reducer 'reduceByKey'  have been set to parallelize by 4 as we don't
want to establish too many DB connections.

We are then calling the foreachPartition on the RDD pairs that were reduced
by the key. Within this foreachPartition method we establish DB connection,
then iterate the results, prepare the Oracle statement for batch insertion
then we commit the batch and close the connection. All these are working
fine.

However, when we execute the job to process 12GB of data, it takes forever
to complete, especially at the foreachPartition stage.

We submitted the job with 6 executors, 2 cores, and 6GB memory of which 0.3
is assigned to spark.storage.memoryFraction.

The job is taking about 50 minutes to complete, which is not ideal. I'm not
sure how we could enhance the performance. I've provided the main body of
the codes, please take a look and advice:

>From Driver:

reduceResultsRDD.foreachPartition(new DB.InsertFunction(
dbuser,dbpass,batchsize));


DB class:

public class DB {
private static final Logger logger = LoggerFactory
.getLogger(DB.class);
public static class InsertFunction implements
VoidFunction>> {

private static final long serialVersionUID = 55766876878L;
private String dbuser = "";
private String dbpass = "";
private int batchsize;

public InsertFunction(String dbuser, String dbpass, int batchsize) {
super();
this.dbuser = dbuser;
this.dbuser = dbuser;
this.batchsize=batchsize;
}

@Override
public void call(Iterator> results) {
Connection connect = null;
PreparedStatement pstmt = null;
try {
connect = getDBConnection(dbuser,
dbpass);

int count = 0;

if (batchsize <= 0) {
batchsize = 1;
}

pstmt1 = connect
.prepareStatement("MERGE INTO SOME TABLE IF RECORD FOUND, IF NOT INSERT");

while (results.hasNext()) {

Tuple2 kv = results.next();
 String [] data = kv._1.concat("," +kv._2).split(",");

 pstmt.setString(1, data[0].toString());
pstmt.setString(2, data[1].toString());
.

pstmt.addBatch();

count++;

if (count == batchsize) {
logger.info("BulkCount : " + count);
pstmt.executeBatch();
connect.commit();
count = 0;
}

pstmt.executeBatch();
connect.commit();

}

pstmt.executeBatch();
connect.commit();

} catch (Exception e) {
logger.error("InsertFunction error: " + e.getMessage());
} finally {

if (pstmt != null) {
pstmt.close();
}

try {
 connect.close();
} catch (SQLException e) {
logger.error("InsertFunction Connection Close error: "
+ e.getMessage());
}
}
}

}
}


RE: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

2015-07-22 Thread Ganelin, Ilya
To be Unpersisted the RDD must be persisted first. If it's set to None, then 
it's not persisted, and as such does not need to be freed. Does that make sense 
?



Thank you,
Ilya Ganelin



-Original Message-
From: Stahlman, Jonathan 
[jonathan.stahl...@capitalone.com]
Sent: Wednesday, July 22, 2015 01:42 PM Eastern Standard Time
To: user@spark.apache.org
Subject: Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

Hello again,

In trying to understand the caching of intermediate RDDs by ALS, I looked into 
the source code and found what may be a bug.  Looking here:

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala#L230

you see that ALS.train() is being called with finalRDDStorageLevel = 
StorageLevel.NONE, which I would understand to mean that the intermediate RDDs 
will not be persisted.  Looking here:

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L631

unpersist() is only being called on the intermediate RDDs (all the *Blocks RDDs 
listed in my first post) if finalRDDStorageLevel != StorageLevel.NONE.

This doesn’t make sense to me – I would expect the RDDs to be removed from the 
cache if finalRDDStorageLevel == StorageLevel.NONE, not the other way around.

Jonathan


From: , Stahlman Jonathan 
mailto:jonathan.stahl...@capitalone.com>>
Date: Thursday, July 16, 2015 at 2:18 PM
To: "user@spark.apache.org" 
mailto:user@spark.apache.org>>
Subject: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

Hello all,

I am running the Spark recommendation algorithm in MLlib and I have been 
studying its output with various model configurations.  Ideally I would like to 
be able to run one job that trains the recommendation model with many different 
configurations to try to optimize for performance.  A sample code in python is 
copied below.

The issue I have is that each new model which is trained caches a set of RDDs 
and eventually the executors run out of memory.  Is there any way in Pyspark to 
unpersist() these RDDs after each iteration?  The names of the RDDs which I 
gather from the UI is:

itemInBlocks
itemOutBlocks
Products
ratingBlocks
userInBlocks
userOutBlocks
users

I am using Spark 1.3.  Thank you for any help!

Regards,
Jonathan




  data_train, data_cv, data_test = data.randomSplit([99,1,1], 2)
  functions = [rating] #defined elsewhere
  ranks = [10,20]
  iterations = [10,20]
  lambdas = [0.01,0.1]
  alphas  = [1.0,50.0]

  results = []
  for ratingFunction, rank, numIterations, m_lambda, m_alpha in 
itertools.product( functions, ranks, iterations, lambdas, alphas ):
#train model
ratings_train = data_train.map(lambda l: Rating( l.user, l.product, 
ratingFunction(l) ) )
model   = ALS.trainImplicit( ratings_train, rank, numIterations, 
lambda_=float(m_lambda), alpha=float(m_alpha) )

#test performance on CV data
ratings_cv = data_cv.map(lambda l: Rating( l.uesr, l.product, 
ratingFunction(l) ) )
auc = areaUnderCurve( ratings_cv, model.predictAll )

#save results
result = ",".join(str(l) for l in 
[ratingFunction.__name__,rank,numIterations,m_lambda,m_alpha,auc])
results.append(result)



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: spark.deploy.spreadOut core allocation

2015-07-22 Thread Andrew Or
Hi Srikanth,

It does look like a bug. Did you set `spark.executor.cores` in your
application by any chance?

-Andrew

2015-07-22 8:05 GMT-07:00 Srikanth :

> Hello,
>
> I've set spark.deploy.spreadOut=false in spark-env.sh.
>
>> export SPARK_MASTER_OPTS="-Dspark.deploy.defaultCores=4
>> -Dspark.deploy.spreadOut=false"
>
>
> There are 3 workers each with 4 cores. Spark-shell was started with noof
> cores = 6.
> Spark UI show that one executor was used with 6 cores.
>
> Is this a bug? This is with Spark 1.4.
>
> [image: Inline image 1]
>
> Srikanth
>


Re: log4j.xml bundled in jar vs log4.properties in spark/conf

2015-07-22 Thread Steve Loughran
relying on classpath loading is very brittle. You can use a system property 
(see https://logging.apache.org/log4j/1.2/manual.html ) to specify your own 
log4j file if you can

For example:  -Dlog4j.configuration=mylog4j.properties


> On 21 Jul 2015, at 00:57, igor.berman  wrote:
> 
> Hi,
> I have log4j.xml in my jar
> From 1.4.1 it seems that log4j.properties in spark/conf is defined first in
> classpath so the spark.conf/log4j.properties "wins"
> before that (in v1.3.0) log4j.xml bundled in jar defined the configuration
> 
> if I manually add my jar to be strictly first in classpath(by adding it to
> SPARK_CLASSPATH in spark-env.sh) log4j.xml in jar wins
> 
> do somebody knows what changed? any ideas?
> ps: tried to use spark.driver.userClassPathFirst=true &
> spark.executor.userClassPathFirst=true, however I'm getting strange errors
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/log4j-xml-bundled-in-jar-vs-log4-properties-in-spark-conf-tp23923.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to keep RDDs in memory between two different batch jobs?

2015-07-22 Thread ericacm
Tachyon is one way.  Also check out the  Spark Job Server
  .



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-keep-RDDs-in-memory-between-two-different-batch-jobs-tp23957p23958.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to keep RDDs in memory between two different batch jobs?

2015-07-22 Thread ericacm
Actually, I should clarify - Tachyon is a way to keep your data in RAM, but
it's not exactly the same as keeping it cached in Spark.  Spark Job Server
is a way to keep it cached in Spark.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-keep-RDDs-in-memory-between-two-different-batch-jobs-tp23957p23959.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark streaming 1.3 issues

2015-07-22 Thread Shushant Arora
In spark streaming 1.3 -

Say I have 10 executors each with 4 cores so in total 40 tasks in parllel
at once. If I repartition kafka directstream to 40 partitions vs say I have
in kafka topic 300 partitions - which one will be more efficient , Should I
repartition the kafka stream equal to num of cores or keep it same as 300?

 If I have number of partitions greater than parllel tasks will that not
cause overhead of task scheduling ?

On Wed, Jul 22, 2015 at 11:37 AM, Tathagata Das  wrote:

> For Java, do
>
> OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd*.rdd()*).
> offsetRanges();
>
> If you fix that error, you should be seeing data.
>
> You can call arbitrary RDD operations on a DStream, using
> DStream.transform. Take a look at the docs.
>
> For the direct kafka approach you are doing,
> - tasks do get launched for empty partitions
> - driver may make multiple calls to Kafka brokers to get all the offset
> information. But that does not mean you should reduce partitions. the whole
> point of having large number of partition is the consume the data in
> parallel. If you reduce the number of partitions, that defeats the purpose
> of having partitoins at all. And the driver making calls for getting
> metadata (i.e. offsets) isnt very costly, nor is it a bottleneck usually.
> Rather receiving and processing the actual data is usually the bottleneck
> and to increase throughput you should have larger number of partitions.
>
>
>
> On Tue, Jul 21, 2015 at 1:02 AM, Akhil Das 
> wrote:
>
>> I'd suggest you upgrading to 1.4 as it has better metrices and UI.
>>
>> Thanks
>> Best Regards
>>
>> On Mon, Jul 20, 2015 at 7:01 PM, Shushant Arora <
>> shushantaror...@gmail.com> wrote:
>>
>>> Is coalesce not applicable to kafkaStream ? How to do coalesce on
>>> kafkadirectstream its not there in api ?
>>> Shall calling repartition on directstream with number of executors as
>>> numpartitions will imrove perfromance ?
>>>
>>> Does in 1.3 tasks get launched for partitions which are empty? Does
>>> driver makes call for getting offsets of each partition separately or in
>>> single call it gets all partitions new offsets ? I mean will reducing no of
>>>  partitions oin kafka help improving the performance?
>>>
>>> On Mon, Jul 20, 2015 at 4:52 PM, Shushant Arora <
>>> shushantaror...@gmail.com> wrote:
>>>
 Hi

 1.I am using spark streaming 1.3 for reading from a kafka queue and
 pushing events to external source.

 I passed in my job 20 executors but it is showing only 6 in executor
 tab ?
 When I used highlevel streaming 1.2 - its showing 20 executors. My
 cluster is 10 node yarn cluster with each node has 8 cores.

 I am calling the script as :

 spark-submit --class classname --num-executors 10 --executor-cores 2
 --master yarn-client jarfile

 2. On Streaming UI

 Started at: Mon Jul 20 11:02:10 GMT+00:00 2015
 Time since start: 13 minutes 28 seconds
 Network receivers: 0
 Batch interval: 1 second
 Processed batches: 807
 Waiting batches: 0
 Received records: 0
 Processed records: 0

 Received records and processed records are always 0 . And Speed of
 processing is slow compare to highlevel api.

 I am procesing the stream using mapPartition.

 When I used
 directKafkaStream.foreachRDD(new Function,
 Void>() {
  @Override
 public Void call(JavaPairRDD rdd) throws Exception {
 // TODO Auto-generated method stub
 OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd).offsetRanges();
 }
 }

 It throws an exception
 java.lang.ClassCastException: org.apache.spark.api.java.JavaPairRDD
 cannot be cast to org.apache.spark.streaming.kafka.HasOffsetRanges

 Thanks
 Shushant







>>>
>>
>


Re: spark.deploy.spreadOut core allocation

2015-07-22 Thread Andrew Or
Hi Srikanth,

I was able to reproduce the issue by setting `spark.cores.max` to a number
greater than the number of cores on a worker. I've filed SPARK-9260 which I
believe is already being fixed in https://github.com/apache/spark/pull/7274.

Thanks for reporting the issue!
-Andrew

2015-07-22 11:49 GMT-07:00 Andrew Or :

> Hi Srikanth,
>
> It does look like a bug. Did you set `spark.executor.cores` in your
> application by any chance?
>
> -Andrew
>
> 2015-07-22 8:05 GMT-07:00 Srikanth :
>
>> Hello,
>>
>> I've set spark.deploy.spreadOut=false in spark-env.sh.
>>
>>> export SPARK_MASTER_OPTS="-Dspark.deploy.defaultCores=4
>>> -Dspark.deploy.spreadOut=false"
>>
>>
>> There are 3 workers each with 4 cores. Spark-shell was started with noof
>> cores = 6.
>> Spark UI show that one executor was used with 6 cores.
>>
>> Is this a bug? This is with Spark 1.4.
>>
>> [image: Inline image 1]
>>
>> Srikanth
>>
>
>


Re: Performance issue with Spak's foreachpartition method

2015-07-22 Thread Robin East
The first question I would ask is have you determined whether you have a 
performance issue writing to Oracle? In particular how many commits are you 
making? If you are issuing a lot of commits that would be a performance problem.

Robin

> On 22 Jul 2015, at 19:11, diplomatic Guru  wrote:
> 
> Hello all,
> 
> We are having a major performance issue with the Spark, which is holding us 
> from going live.
> 
> We have a job that carries out computation on log files and write the results 
> into Oracle DB.
> 
> The reducer 'reduceByKey'  have been set to parallelize by 4 as we don't want 
> to establish too many DB connections.
> 
> We are then calling the foreachPartition on the RDD pairs that were reduced 
> by the key. Within this foreachPartition method we establish DB connection, 
> then iterate the results, prepare the Oracle statement for batch insertion 
> then we commit the batch and close the connection. All these are working fine.
> 
> However, when we execute the job to process 12GB of data, it takes forever to 
> complete, especially at the foreachPartition stage.
> 
> We submitted the job with 6 executors, 2 cores, and 6GB memory of which 0.3 
> is assigned to spark.storage.memoryFraction.
> 
> The job is taking about 50 minutes to complete, which is not ideal. I'm not 
> sure how we could enhance the performance. I've provided the main body of the 
> codes, please take a look and advice:
> 
> From Driver:
> 
> reduceResultsRDD.foreachPartition(new DB.InsertFunction( 
> dbuser,dbpass,batchsize));
> 
> 
> DB class:
> 
> public class DB {
>   private static final Logger logger = LoggerFactory
>   .getLogger(DB.class);
>   
> public static class InsertFunction implements
>   VoidFunction>> {
> 
>   private static final long serialVersionUID = 55766876878L;
>   private String dbuser = "";
>   private String dbpass = "";
>   private int batchsize;
> 
>   public InsertFunction(String dbuser, String dbpass, int 
> batchsize) {
>   super();
>   this.dbuser = dbuser;
>   this.dbuser = dbuser;
>   this.batchsize=batchsize;
>   }
> 
> @Override
>   public void call(Iterator> results) {
>   Connection connect = null;
>   PreparedStatement pstmt = null;
>   try {
>   connect = getDBConnection(dbuser,
>   dbpass);
> 
>   int count = 0;
> 
>   if (batchsize <= 0) {
>   batchsize = 1;
>   }
> 
>   pstmt1 = connect
>   .prepareStatement("MERGE INTO 
> SOME TABLE IF RECORD FOUND, IF NOT INSERT");
> 
>   while (results.hasNext()) {
> 
>   Tuple2 kv = 
> results.next();
>   
>   String [] data = 
> kv._1.concat("," +kv._2).split(",");
> 
>   
>   pstmt.setString(1, data[0].toString());
>   pstmt.setString(2, data[1].toString());
>.
> 
>   pstmt.addBatch();
> 
>   count++;
> 
>   if (count == batchsize) {
>   logger.info 
> ("BulkCount : " + count);
>   pstmt.executeBatch();
>   connect.commit();
>   count = 0;
>   }
> 
>   pstmt.executeBatch();
>   connect.commit();
> 
>   }
> 
>   pstmt.executeBatch();
>   connect.commit();
> 
>   } catch (Exception e) {
>   logger.error("InsertFunction error: " + 
> e.getMessage());
>   } finally {
> 
>   if (pstmt != null) {
>   pstmt.close();
>   }
> 
>   try {
>   
>   connect.close();
>   } catch (SQLException e) {
>   logger.error("InsertFunction Connection 
> Close error: "
>   + e.getMessage());
>   }
>   }
>   }
> 
>   }

Re: How to share a Map among RDDS?

2015-07-22 Thread Dan Dong
Hi, Andrew,
  If I broadcast the Map:
val map2=sc.broadcast(map1)

I will get compilation error:
org.apache.spark.broadcast.Broadcast[scala.collection.immutable.Map[Int,String]]
does not take parameters
[error]  val matchs= Vecs.map(term=>term.map{case (a,b)=>(map2(a),b)})

Seems it's still an RDD, so how to access it by value=map2(key) ? Thanks!

Cheers,
Dan



2015-07-22 2:20 GMT-05:00 Andrew Or :

> Hi Dan,
>
> If the map is small enough, you can just broadcast it, can't you? It
> doesn't have to be an RDD. Here's an example of broadcasting an array and
> using it on the executors:
> https://github.com/apache/spark/blob/c03299a18b4e076cabb4b7833a1e7632c5c0dabe/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala
> .
>
> -Andrew
>
> 2015-07-21 19:56 GMT-07:00 ayan guha :
>
>> Either you have to do rdd.collect and then broadcast or you can do a join
>> On 22 Jul 2015 07:54, "Dan Dong"  wrote:
>>
>>> Hi, All,
>>>
>>>
>>> I am trying to access a Map from RDDs that are on different compute
>>> nodes, but without success. The Map is like:
>>>
>>> val map1 = Map("aa"->1,"bb"->2,"cc"->3,...)
>>>
>>> All RDDs will have to check against it to see if the key is in the Map
>>> or not, so seems I have to make the Map itself global, the problem is that
>>> if the Map is stored as RDDs and spread across the different nodes, each
>>> node will only see a piece of the Map and the info will not be complete to
>>> check against the Map( an then replace the key with the corresponding
>>> value) E,g:
>>>
>>> val matchs= Vecs.map(term=>term.map{case (a,b)=>(map1(a),b)})
>>>
>>> But if the Map is not an RDD, how to share it like sc.broadcast(map1)
>>>
>>> Any idea about this? Thanks!
>>>
>>>
>>> Cheers,
>>> Dan
>>>
>>>
>


Re: How to share a Map among RDDS?

2015-07-22 Thread Andrew Or
Hi Dan,

`map2` is a broadcast variable, not your map. To access the map on the
executors you need to do `map2.value(a)`.

-Andrew

2015-07-22 12:20 GMT-07:00 Dan Dong :

> Hi, Andrew,
>   If I broadcast the Map:
> val map2=sc.broadcast(map1)
>
> I will get compilation error:
> org.apache.spark.broadcast.Broadcast[scala.collection.immutable.Map[Int,String]]
> does not take parameters
> [error]  val matchs= Vecs.map(term=>term.map{case (a,b)=>(map2(a),b)})
>
> Seems it's still an RDD, so how to access it by value=map2(key) ? Thanks!
>
> Cheers,
> Dan
>
>
>
> 2015-07-22 2:20 GMT-05:00 Andrew Or :
>
>> Hi Dan,
>>
>> If the map is small enough, you can just broadcast it, can't you? It
>> doesn't have to be an RDD. Here's an example of broadcasting an array and
>> using it on the executors:
>> https://github.com/apache/spark/blob/c03299a18b4e076cabb4b7833a1e7632c5c0dabe/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala
>> .
>>
>> -Andrew
>>
>> 2015-07-21 19:56 GMT-07:00 ayan guha :
>>
>>> Either you have to do rdd.collect and then broadcast or you can do a join
>>> On 22 Jul 2015 07:54, "Dan Dong"  wrote:
>>>
 Hi, All,


 I am trying to access a Map from RDDs that are on different compute
 nodes, but without success. The Map is like:

 val map1 = Map("aa"->1,"bb"->2,"cc"->3,...)

 All RDDs will have to check against it to see if the key is in the Map
 or not, so seems I have to make the Map itself global, the problem is that
 if the Map is stored as RDDs and spread across the different nodes, each
 node will only see a piece of the Map and the info will not be complete to
 check against the Map( an then replace the key with the corresponding
 value) E,g:

 val matchs= Vecs.map(term=>term.map{case (a,b)=>(map1(a),b)})

 But if the Map is not an RDD, how to share it like sc.broadcast(map1)

 Any idea about this? Thanks!


 Cheers,
 Dan


>>
>


Re: How to keep RDDs in memory between two different batch jobs?

2015-07-22 Thread harirajaram
I was about say whatever the previous post said,so +1 to the previous
post,from my understanding (gut feeling) of your requirement it very easy to
do this with spark-job-server.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-keep-RDDs-in-memory-between-two-different-batch-jobs-tp23957p23960.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Need help in SparkSQL

2015-07-22 Thread Jörn Franke
Can you provide an example of an and query ? If you do just look-up you
should try Hbase/ phoenix, otherwise you can try orc with storage index
and/or compression, but this depends on how your queries look like

Le mer. 22 juil. 2015 à 14:48, Jeetendra Gangele  a
écrit :

> HI All,
>
> I have data in MongoDb(few TBs) which I want to migrate to HDFS to do
> complex queries analysis on this data.Queries like AND queries involved
> multiple fields
>
> So my question in which which format I should store the data in HDFS so
> that processing will be fast for such kind of queries?
>
>
> Regards
> Jeetendra
>
>


Re: How to share a Map among RDDS?

2015-07-22 Thread Dan Dong
Thanks Andrew, exactly.

2015-07-22 14:26 GMT-05:00 Andrew Or :

> Hi Dan,
>
> `map2` is a broadcast variable, not your map. To access the map on the
> executors you need to do `map2.value(a)`.
>
> -Andrew
>
> 2015-07-22 12:20 GMT-07:00 Dan Dong :
>
>> Hi, Andrew,
>>   If I broadcast the Map:
>> val map2=sc.broadcast(map1)
>>
>> I will get compilation error:
>> org.apache.spark.broadcast.Broadcast[scala.collection.immutable.Map[Int,String]]
>> does not take parameters
>> [error]  val matchs= Vecs.map(term=>term.map{case (a,b)=>(map2(a),b)})
>>
>> Seems it's still an RDD, so how to access it by value=map2(key) ? Thanks!
>>
>> Cheers,
>> Dan
>>
>>
>>
>> 2015-07-22 2:20 GMT-05:00 Andrew Or :
>>
>>> Hi Dan,
>>>
>>> If the map is small enough, you can just broadcast it, can't you? It
>>> doesn't have to be an RDD. Here's an example of broadcasting an array and
>>> using it on the executors:
>>> https://github.com/apache/spark/blob/c03299a18b4e076cabb4b7833a1e7632c5c0dabe/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala
>>> .
>>>
>>> -Andrew
>>>
>>> 2015-07-21 19:56 GMT-07:00 ayan guha :
>>>
 Either you have to do rdd.collect and then broadcast or you can do a
 join
 On 22 Jul 2015 07:54, "Dan Dong"  wrote:

> Hi, All,
>
>
> I am trying to access a Map from RDDs that are on different compute
> nodes, but without success. The Map is like:
>
> val map1 = Map("aa"->1,"bb"->2,"cc"->3,...)
>
> All RDDs will have to check against it to see if the key is in the Map
> or not, so seems I have to make the Map itself global, the problem is that
> if the Map is stored as RDDs and spread across the different nodes, each
> node will only see a piece of the Map and the info will not be complete to
> check against the Map( an then replace the key with the corresponding
> value) E,g:
>
> val matchs= Vecs.map(term=>term.map{case (a,b)=>(map1(a),b)})
>
> But if the Map is not an RDD, how to share it like sc.broadcast(map1)
>
> Any idea about this? Thanks!
>
>
> Cheers,
> Dan
>
>
>>>
>>
>


spark.executor.memory and spark.driver.memory have no effect in yarn-cluster mode (1.4.x)?

2015-07-22 Thread Michael Misiewicz
Hi group,

I seem to have encountered a weird problem with 'spark-submit' and manually
setting sparkconf values in my applications.

It seems like setting the configuration values spark.executor.memory
and spark.driver.memory don't have any effect, when they are set from
within my application (i.e. prior to creating a SparkContext).

In yarn-cluster mode, only the values specified on the command line via
spark-submit for driver and executor memory are respected, and if not, it
appears spark falls back to defaults. For example,

Correct behavior noted in Driver's logs on YARN when --executor-memory is
specified:

15/07/22 19:25:59 INFO yarn.YarnAllocator: Will request 200 executor
containers, each with 1 cores and 13824 MB memory including 1536 MB
overhead
15/07/22 19:25:59 INFO yarn.YarnAllocator: Container request (host:
Any, capability: )


But not when spark.executor.memory is specified prior to spark context
initialization:

15/07/22 19:22:22 INFO yarn.YarnAllocator: Will request 200 executor
containers, each with 1 cores and 2560 MB memory including 1536 MB
overhead
15/07/22 19:22:22 INFO yarn.YarnAllocator: Container request (host:
Any, capability: )


In both cases, executor mem should be 10g. Interestingly, I set a
parameter spark.yarn.executor.memoryOverhead which appears to be
respected whether or not I'm in yarn-cluster or yarn-client mode.


Has anyone seen this before? Any idea what might be causing this behavior?


Re: Performance issue with Spak's foreachpartition method

2015-07-22 Thread diplomatic Guru
Thanks Robin for your reply.

I'm pretty sure that writing to Oracle is taking longer as when writing to
HDFS is only taking ~5minutes.

The job is writing about ~5 Million of records. I've set the job to call
executeBatch() when the batchSize reaches 200,000 of records, so I assume
that commit will be invoked at every 200K batch. In this case, it should
only call commit 25 times, is this too much? I wouldn't want to increase
the batch size any further as it may cause Java heap issue. I do not have
much knowledge in Oracle side, so any advice with the configuration will be
grateful.

Thanks,

Raj





On 22 July 2015 at 20:20, Robin East  wrote:

> The first question I would ask is have you determined whether you have a
> performance issue writing to Oracle? In particular how many commits are you
> making? If you are issuing a lot of commits that would be a performance
> problem.
>
> Robin
>
> On 22 Jul 2015, at 19:11, diplomatic Guru 
> wrote:
>
> Hello all,
>
> We are having a major performance issue with the Spark, which is holding
> us from going live.
>
> We have a job that carries out computation on log files and write the
> results into Oracle DB.
>
> The reducer 'reduceByKey'  have been set to parallelize by 4 as we don't
> want to establish too many DB connections.
>
> We are then calling the foreachPartition on the RDD pairs that were
> reduced by the key. Within this foreachPartition method we establish DB
> connection, then iterate the results, prepare the Oracle statement for
> batch insertion then we commit the batch and close the connection. All
> these are working fine.
>
> However, when we execute the job to process 12GB of data, it takes forever
> to complete, especially at the foreachPartition stage.
>
> We submitted the job with 6 executors, 2 cores, and 6GB memory of which
> 0.3 is assigned to spark.storage.memoryFraction.
>
> The job is taking about 50 minutes to complete, which is not ideal. I'm
> not sure how we could enhance the performance. I've provided the main body
> of the codes, please take a look and advice:
>
> From Driver:
>
> reduceResultsRDD.foreachPartition(new DB.InsertFunction(
> dbuser,dbpass,batchsize));
>
>
> DB class:
>
> public class DB {
> private static final Logger logger = LoggerFactory
> .getLogger(DB.class);
> public static class InsertFunction implements
> VoidFunction>> {
>
> private static final long serialVersionUID = 55766876878L;
> private String dbuser = "";
> private String dbpass = "";
> private int batchsize;
>
> public InsertFunction(String dbuser, String dbpass, int batchsize) {
> super();
> this.dbuser = dbuser;
> this.dbuser = dbuser;
> this.batchsize=batchsize;
> }
>
> @Override
> public void call(Iterator> results) {
> Connection connect = null;
> PreparedStatement pstmt = null;
> try {
> connect = getDBConnection(dbuser,
> dbpass);
>
> int count = 0;
>
> if (batchsize <= 0) {
> batchsize = 1;
> }
>
> pstmt1 = connect
> .prepareStatement("MERGE INTO SOME TABLE IF RECORD FOUND, IF NOT INSERT");
>
> while (results.hasNext()) {
>
> Tuple2 kv = results.next();
>  String [] data = kv._1.concat("," +kv._2).split(",");
>
>  pstmt.setString(1, data[0].toString());
> pstmt.setString(2, data[1].toString());
> .
>
> pstmt.addBatch();
>
> count++;
>
> if (count == batchsize) {
> logger.info("BulkCount : " + count);
> pstmt.executeBatch();
> connect.commit();
> count = 0;
> }
>
> pstmt.executeBatch();
> connect.commit();
>
> }
>
> pstmt.executeBatch();
> connect.commit();
>
> } catch (Exception e) {
> logger.error("InsertFunction error: " + e.getMessage());
> } finally {
>
> if (pstmt != null) {
> pstmt.close();
> }
>
> try {
>  connect.close();
> } catch (SQLException e) {
> logger.error("InsertFunction Connection Close error: "
> + e.getMessage());
> }
> }
> }
>
> }
> }
>
>
>


databricks spark sql csv FAILFAST not failing, Spark 1.3.1 Java 7

2015-07-22 Thread Adam Pritchard
Hi all,

I am using the databricks csv library to load some data into a data frame.
https://github.com/databricks/spark-csv


I am trying to confirm that failfast mode works correctly and aborts
execution upon receiving an invalid csv file.  But have not been able to
see it fail yet after testing numerous invalid csv files.  Any advice?

spark 1.3.1 running on mapr vm 4.1.0 java 1.7


SparkConf conf = new SparkConf().setAppName("Dataframe testing");

JavaSparkContext sc = new JavaSparkContext(conf);


SQLContext sqlContext = new SQLContext(sc);
HashMap options = new HashMap();
options.put("header", "true");
options.put("path", args[0]);
options.put("mode", "FAILFAST");
//partner data
DataFrame partnerData = sqlContext.load("com.databricks.spark.csv", options
);
//register partnerData table in spark sql
partnerData.registerTempTable("partnerData");

partnerData.printSchema();
partnerData.show();


It just runs like normal, and will output the data, even with an invalid
csv file.


Thanks!


Spark DataFrame created from JavaRDD copies all columns data into first column

2015-07-22 Thread unk1102
Hi I have a DataFrame which I need to convert into JavaRDD and back to
DataFrame I have the following code

DataFrame sourceFrame =
hiveContext.read().format("orc").load("/path/to/orc/file");
//I do order by in above sourceFrame and then I convert it into JavaRDD
JavaRDD modifiedRDD = sourceFrame.toJavaRDD().map(new
Function({
public Row call(Row row) throws Exception {
   if(row != null) {
   //updated row by creating new Row
   return RowFactory.create(updateRow);
   }
  return null;
});
//now I convert above JavaRDD into DataFrame using the following
DataFrame modifiedFrame = sqlContext.createDataFrame(modifiedRDD,schema);

sourceFrame and modifiedFrame schema is same when I call sourceFrame.show()
output is expected I see every column has corresponding values and no column
is empty but when I call modifiedFrame.show() I see all the columns values
gets merged into first column value for e.g. assume source DataFrame has 3
column as shown below

_col1_col2_col3
 ABC   10  DEF
 GHI   20  JKL
When I print modifiedFrame which I converted from JavaRDD it shows in the
following order

_col1 _col2   _col3
ABC,10,DEF
GHI,20,JKL

As shown above all the _col1 has all the values and _col2 and _col3 is
empty. I dont know what is wrong I am doing please guide I am new to Spark
thanks in advance.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-DataFrame-created-from-JavaRDD-Row-copies-all-columns-data-into-first-column-tp23961.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark.executor.memory and spark.driver.memory have no effect in yarn-cluster mode (1.4.x)?

2015-07-22 Thread Andrew Or
Hi Michael,

In general, driver related properties should not be set through the
SparkConf. This is because by the time the SparkConf is created, we have
already started the driver JVM, so it's too late to change the memory,
class paths and other properties.

In cluster mode, executor related properties should also not be set through
the SparkConf. This is because the driver is run on the cluster just like
the executors, and the executors are launched independently by whatever the
cluster manager (e.g. YARN) is configured to do.

The recommended way of setting these properties is either through the
conf/spark-defaults.conf properties file, or through the spark-submit
command line, e.g.:

bin/spark-shell --master yarn --executor-memory 2g --driver-memory 5g

Let me know if that answers your question,
-Andrew


2015-07-22 12:38 GMT-07:00 Michael Misiewicz :

> Hi group,
>
> I seem to have encountered a weird problem with 'spark-submit' and
> manually setting sparkconf values in my applications.
>
> It seems like setting the configuration values spark.executor.memory
> and spark.driver.memory don't have any effect, when they are set from
> within my application (i.e. prior to creating a SparkContext).
>
> In yarn-cluster mode, only the values specified on the command line via
> spark-submit for driver and executor memory are respected, and if not, it
> appears spark falls back to defaults. For example,
>
> Correct behavior noted in Driver's logs on YARN when --executor-memory is
> specified:
>
> 15/07/22 19:25:59 INFO yarn.YarnAllocator: Will request 200 executor 
> containers, each with 1 cores and 13824 MB memory including 1536 MB overhead
> 15/07/22 19:25:59 INFO yarn.YarnAllocator: Container request (host: Any, 
> capability: )
>
>
> But not when spark.executor.memory is specified prior to spark context 
> initialization:
>
> 15/07/22 19:22:22 INFO yarn.YarnAllocator: Will request 200 executor 
> containers, each with 1 cores and 2560 MB memory including 1536 MB overhead
> 15/07/22 19:22:22 INFO yarn.YarnAllocator: Container request (host: Any, 
> capability: )
>
>
> In both cases, executor mem should be 10g. Interestingly, I set a parameter 
> spark.yarn.executor.memoryOverhead which appears to be respected whether or 
> not I'm in yarn-cluster or yarn-client mode.
>
>
> Has anyone seen this before? Any idea what might be causing this behavior?
>
>


Re: spark.deploy.spreadOut core allocation

2015-07-22 Thread Srikanth
Cool. Thanks!

Srikanth

On Wed, Jul 22, 2015 at 3:12 PM, Andrew Or  wrote:

> Hi Srikanth,
>
> I was able to reproduce the issue by setting `spark.cores.max` to a number
> greater than the number of cores on a worker. I've filed SPARK-9260 which I
> believe is already being fixed in
> https://github.com/apache/spark/pull/7274.
>
> Thanks for reporting the issue!
> -Andrew
>
> 2015-07-22 11:49 GMT-07:00 Andrew Or :
>
>> Hi Srikanth,
>>
>> It does look like a bug. Did you set `spark.executor.cores` in your
>> application by any chance?
>>
>> -Andrew
>>
>> 2015-07-22 8:05 GMT-07:00 Srikanth :
>>
>>> Hello,
>>>
>>> I've set spark.deploy.spreadOut=false in spark-env.sh.
>>>
 export SPARK_MASTER_OPTS="-Dspark.deploy.defaultCores=4
 -Dspark.deploy.spreadOut=false"
>>>
>>>
>>> There are 3 workers each with 4 cores. Spark-shell was started with noof
>>> cores = 6.
>>> Spark UI show that one executor was used with 6 cores.
>>>
>>> Is this a bug? This is with Spark 1.4.
>>>
>>> [image: Inline image 1]
>>>
>>> Srikanth
>>>
>>
>>
>


Re: spark 1.3.1 : unable to access s3n:// urls (no file system for scheme s3n:)

2015-07-22 Thread Eugene Morozov
Hi, 

I’m stuck with the same issue, but I see 
org.apache.hadoop.fs.s3native.NativeS3FileSystem in the hadoop-core:1.0.4 
(that’s the current hadoop-client I use) and this far is transitive dependency 
that comes from spark itself. I’m using custom build of spark 1.3.1 with 
hadoop-client 1.0.4. 

[INFO] +- 
org.apache.spark:spark-core_2.10:jar:1.3.1-hadoop-client-1.0.4:provided
...
[INFO] |  +- org.apache.hadoop:hadoop-client:jar:1.0.4:provided
[INFO] |  |  \- org.apache.hadoop:hadoop-core:jar:1.0.4:provided

The thing is I don’t have any direct usages of any hadoop-client version, so in 
my understanding I should be able to run my jar on any version of spark (1.3.1 
with hadoop-client 2.2.0 up to 2.2.6 or 1.3.1 with hadoop-client 1.0.4 up to 
1.2.1), but in reality, running it on a live cluster I’m getting class not 
found exception. I’ve checked über-jar of spark itself, and NativeS3FileSystem 
is there, so I don’t really understand where it comes from.

java.lang.RuntimeException: java.lang.ClassNotFoundException: Class 
org.apache.hadoop.fs.s3native.NativeS3FileSystem not found
at 
org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2074)


I’ve just got an idea. Is it possible that Executor’s classpath is different 
from the Worker classpath? How can I check Executor’s classpath?

On 23 Apr 2015, at 17:35, Ted Yu  wrote:

> NativeS3FileSystem class is in hadoop-aws jar.
> Looks like it was not on classpath.
> 
> Cheers
> 
> On Thu, Apr 23, 2015 at 7:30 AM, Sujee Maniyam  wrote:
> Thanks all...
> 
> btw, s3n load works without any issues with  spark-1.3.1-bulit-for-hadoop 2.4 
> 
> I tried this on 1.3.1-hadoop26
> >  sc.hadoopConfiguration.set("fs.s3n.impl", 
> > "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
> > val f = sc.textFile("s3n://bucket/file")
> > f.count
> 
> No it can't find the implementation path.  Looks like some jar is missing ?
> 
> java.lang.RuntimeException: java.lang.ClassNotFoundException: Class 
> org.apache.hadoop.fs.s3native.NativeS3FileSystem not found
>   at 
> org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2074)
>   at 
> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2578)
>   at 
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)
>   at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
> 
> On Wednesday, April 22, 2015, Shuai Zheng  wrote:
> Below is my code to access s3n without problem (only for 1.3.1. there is a 
> bug in 1.3.0).
> 
>  
> 
>   Configuration hadoopConf = ctx.hadoopConfiguration();
> 
>   hadoopConf.set("fs.s3n.impl", 
> "org.apache.hadoop.fs.s3native.NativeS3FileSystem");
> 
>   hadoopConf.set("fs.s3n.awsAccessKeyId", awsAccessKeyId);
> 
>   hadoopConf.set("fs.s3n.awsSecretAccessKey", awsSecretAccessKey);
> 
>  
> 
> Regards,
> 
>  
> 
> Shuai
> 
>  
> 
> From: Sujee Maniyam [mailto:su...@sujee.net] 
> Sent: Wednesday, April 22, 2015 12:45 PM
> To: Spark User List
> Subject: spark 1.3.1 : unable to access s3n:// urls (no file system for 
> scheme s3n:)
> 
>  
> 
> Hi all
> 
> I am unable to access s3n://  urls using   sc.textFile().. getting 'no file 
> system for scheme s3n://'  error.
> 
>  
> 
> a bug or some conf settings missing?
> 
>  
> 
> See below for details:
> 
>  
> 
> env variables : 
> 
> AWS_SECRET_ACCESS_KEY=set
> 
> AWS_ACCESS_KEY_ID=set
> 
>  
> 
> spark/RELAESE :
> 
> Spark 1.3.1 (git revision 908a0bf) built for Hadoop 2.6.0
> 
> Build flags: -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver 
> -Pyarn -DzincPort=3034
> 
>  
> 
>  
> 
> ./bin/spark-shell
> 
> > val f = sc.textFile("s3n://bucket/file")
> 
> > f.count
> 
>  
> 
> error==> 
> 
> java.io.IOException: No FileSystem for scheme: s3n
> 
> at 
> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2584)
> 
> at 
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)
> 
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
> 
> at 
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
> 
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)
> 
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
> 
> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
> 
> at 
> org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:256)
> 
> at 
> org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
> 
> at 
> org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
> 
> at 
> org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203)
> 
> at 
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
> 
> at 
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
> 

Re: spark.executor.memory and spark.driver.memory have no effect in yarn-cluster mode (1.4.x)?

2015-07-22 Thread Michael Misiewicz
That makes a lot of sense, thanks for the concise answer!

On Wed, Jul 22, 2015 at 4:10 PM, Andrew Or  wrote:

> Hi Michael,
>
> In general, driver related properties should not be set through the
> SparkConf. This is because by the time the SparkConf is created, we have
> already started the driver JVM, so it's too late to change the memory,
> class paths and other properties.
>
> In cluster mode, executor related properties should also not be set
> through the SparkConf. This is because the driver is run on the cluster
> just like the executors, and the executors are launched independently by
> whatever the cluster manager (e.g. YARN) is configured to do.
>
> The recommended way of setting these properties is either through the
> conf/spark-defaults.conf properties file, or through the spark-submit
> command line, e.g.:
>
> bin/spark-shell --master yarn --executor-memory 2g --driver-memory 5g
>
> Let me know if that answers your question,
> -Andrew
>
>
> 2015-07-22 12:38 GMT-07:00 Michael Misiewicz :
>
>> Hi group,
>>
>> I seem to have encountered a weird problem with 'spark-submit' and
>> manually setting sparkconf values in my applications.
>>
>> It seems like setting the configuration values spark.executor.memory
>> and spark.driver.memory don't have any effect, when they are set from
>> within my application (i.e. prior to creating a SparkContext).
>>
>> In yarn-cluster mode, only the values specified on the command line via
>> spark-submit for driver and executor memory are respected, and if not, it
>> appears spark falls back to defaults. For example,
>>
>> Correct behavior noted in Driver's logs on YARN when --executor-memory is
>> specified:
>>
>> 15/07/22 19:25:59 INFO yarn.YarnAllocator: Will request 200 executor 
>> containers, each with 1 cores and 13824 MB memory including 1536 MB overhead
>> 15/07/22 19:25:59 INFO yarn.YarnAllocator: Container request (host: Any, 
>> capability: )
>>
>>
>> But not when spark.executor.memory is specified prior to spark context 
>> initialization:
>>
>> 15/07/22 19:22:22 INFO yarn.YarnAllocator: Will request 200 executor 
>> containers, each with 1 cores and 2560 MB memory including 1536 MB overhead
>> 15/07/22 19:22:22 INFO yarn.YarnAllocator: Container request (host: Any, 
>> capability: )
>>
>>
>> In both cases, executor mem should be 10g. Interestingly, I set a parameter 
>> spark.yarn.executor.memoryOverhead which appears to be respected whether or 
>> not I'm in yarn-cluster or yarn-client mode.
>>
>>
>> Has anyone seen this before? Any idea what might be causing this behavior?
>>
>>
>


Re: How to keep RDDs in memory between two different batch jobs?

2015-07-22 Thread Haoyuan Li
Yes. Tachyon can handle this well: http://tachyon-project.org/

Best,

Haoyuan

On Wed, Jul 22, 2015 at 10:56 AM, swetha  wrote:

> Hi,
>
> We have a requirement wherein we need to keep RDDs in memory between Spark
> batch processing that happens every one hour. The idea here is to have RDDs
> that have active user sessions in memory between two jobs so that once a
> job
> processing is  done and another job is run after an hour the RDDs with
> active sessions are still available for joining with those in the current
> job. So, what do we need to keep the data in memory in between two batch
> jobs? Can we use Tachyon?
>
> Thanks,
> Swetha
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-keep-RDDs-in-memory-between-two-different-batch-jobs-tp23957.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Haoyuan Li
CEO, Tachyon Nexus 


spark-submit and spark-shell behaviors mismatch.

2015-07-22 Thread Dan Dong
Hi,

  I have a simple test spark program as below, the strange thing is that it
runs well under a spark-shell, but will get a runtime error of

java.lang.NoSuchMethodError:

in spark-submit, which indicate the line of:

val maps2=maps.collect.toMap

has problem. But why the compilation has no problem and it works well under
spark-shell(==>maps2: scala.collection.immutable.Map[Int,String] =
Map(269953 -> once, 97 -> a, 451002 -> upon, 117481 -> was, 226916 ->
there, 414413 -> time, 146327 -> king) )? Thanks!

import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.mllib.feature.HashingTF
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext
import org.apache.spark._
import SparkContext._


val docs=sc.parallelize(Array(Array("once" ,"upon", "a", "time"),
Array("there", "was", "a", "king")))

val hashingTF = new HashingTF()

val maps=docs.flatMap{term=>term.map(ele=>(hashingTF.indexOf(ele),ele))}

val maps2=maps.collect.toMap


Cheers,

Dan


RE: Need help in SparkSQL

2015-07-22 Thread Mohammed Guller
Parquet

Mohammed

From: Jeetendra Gangele [mailto:gangele...@gmail.com]
Sent: Wednesday, July 22, 2015 5:48 AM
To: user
Subject: Need help in SparkSQL

HI All,

I have data in MongoDb(few TBs) which I want to migrate to HDFS to do complex 
queries analysis on this data.Queries like AND queries involved multiple fields

So my question in which which format I should store the data in HDFS so that 
processing will be fast for such kind of queries?


Regards
Jeetendra



Issue with column named "count" in a DataFrame

2015-07-22 Thread Young, Matthew T
I'm trying to do some simple counting and aggregation in an IPython notebook 
with Spark 1.4.0 and I have encountered behavior that looks like a bug.

When I try to filter rows out of an RDD with a column name of count I get a 
large error message. I would just avoid naming things count, except for the 
fact that this is the default column name created with the count() operation in 
pyspark.sql.GroupedData

The small example program below demonstrates the issue.

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
dataFrame = sc.parallelize([("foo",), ("foo",), ("bar",)]).toDF(["title"])
counts = dataFrame.groupBy('title').count()
counts.filter("title = 'foo'").show() # Works
counts.filter("count > 1").show() # Errors out


I can even reproduce the issue in a PySpark shell session by entering these 
commands.

I suspect that the error has something to with Spark wanting to call the 
count() function in place of looking at the count column.

The error message is as follows:


Py4JJavaError Traceback (most recent call last)
 in ()
> 1 counts.filter("count > 1").show() # Errors Out

C:\Users\User\Downloads\spark-1.4.0-bin-hadoop2.6\python\pyspark\sql\dataframe.pyc
 in filter(self, condition)
774 """
775 if isinstance(condition, basestring):
--> 776 jdf = self._jdf.filter(condition)
777 elif isinstance(condition, Column):
778 jdf = self._jdf.filter(condition._jc)

C:\Python27\lib\site-packages\py4j\java_gateway.pyc in __call__(self, *args)
536 answer = self.gateway_client.send_command(command)
537 return_value = get_return_value(answer, self.gateway_client,
--> 538 self.target_id, self.name)
539
540 for temp_arg in temp_args:

C:\Python27\lib\site-packages\py4j\protocol.pyc in get_return_value(answer, 
gateway_client, target_id, name)
298 raise Py4JJavaError(
299 'An error occurred while calling {0}{1}{2}.\n'.
--> 300 format(target_id, '.', name), value)
301 else:
302 raise Py4JError(

Py4JJavaError: An error occurred while calling o229.filter.
: java.lang.RuntimeException: [1.7] failure: ``('' expected but `>' found

count > 1
  ^
at scala.sys.package$.error(package.scala:27)
at 
org.apache.spark.sql.catalyst.SqlParser.parseExpression(SqlParser.scala:45)
at org.apache.spark.sql.DataFrame.filter(DataFrame.scala:652)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Unknown Source)



Is there a recommended workaround to the inability to filter on a column named 
count? Do I have to make a new DataFrame and rename the column just to work 
around this bug? What's the best way to do that?

Thanks,

-- Matthew Young

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



No suitable driver found for jdbc:mysql://

2015-07-22 Thread roni
Hi All,
 I have a cluster with spark 1.4.
I am trying to save data to mysql but getting error

Exception in thread "main" java.sql.SQLException: No suitable driver found
for jdbc:mysql://<>.rds.amazonaws.com:3306/DAE_kmer?user=<>&password=<>


*I looked at - https://issues.apache.org/jira/browse/SPARK-8463
 and added the connector
jar to the same location as on Master using copy-dir script.*

*But I am still getting the same error. This sued to work with 1.3.*

*This is my command  to run the program - **$SPARK_HOME/bin/spark-submit
--jars
/root/spark/lib/mysql-connector-java-5.1.35/mysql-connector-java-5.1.35-bin.jar
--conf
spark.executor.extraClassPath=/root/spark/lib/mysql-connector-java-5.1.35/mysql-connector-java-5.1.35-bin.jar
--conf spark.executor.memory=55g --driver-memory=55g
--master=spark://ec2-52-25-191-999.us-west-2.compute.amazonaws.com:7077
  --class
"saveBedToDB"  target/scala-2.10/adam-project_2.10-1.0.jar*

*What else can I Do ?*

*Thanks*

*-Roni*


Re: Spark SQL Table Caching

2015-07-22 Thread Pedro Rodriguez
I would be interested in the answer to this question, plus the relationship
between those and registerTempTable()

Pedro

On Tue, Jul 21, 2015 at 1:59 PM, Brandon White 
wrote:

> A few questions about caching a table in Spark SQL.
>
> 1) Is there any difference between caching the dataframe and the table?
>
> df.cache() vs sqlContext.cacheTable("tableName")
>
> 2) Do you need to "warm up" the cache before seeing the performance
> benefits? Is the cache LRU? Do you need to run some queries on the table
> before it is cached in memory?
>
> 3) Is caching the table much faster than .saveAsTable? I am only seeing a
> 10 %- 20% performance increase.
>



-- 
Pedro Rodriguez
UCBerkeley 2014 | Computer Science
SnowGeek 
pedro-rodriguez.com
ski.rodrig...@gmail.com
208-340-1703


Using Wurfl in Spark

2015-07-22 Thread Zhongxiao Ma
Hi all,

I am trying to do wurfl lookup in a spark cluster and getting exceptions, I am 
pretty sure that the same thing works in small scale. But it fails when I tried 
to do it in spark. I used spark-ec2/copy-dir to copy the wurfl library to 
workers already and launched the spark-shell with parameter —jars including 
wurfl and its dependencies in the lib/ directory.

To reconstruct the error, let’s say that I have a userAgentRdd already, which 
is RDD[String] and a userAgentSample of Array[String]. I am trying to reuse the 
wurfl engine by doing mapPartitions so I can save time for reloading it.

import net.sourceforge.wurfl.core.GeneralWURFLEngine

def lookupModel(wurfl: GeneralWURFLEngine)(userAgent: String) = {
  val device = wurfl.getDeviceForRequest(userAgent)
  val brand = device.getCapability("brand_name")
  val model = device.getCapability("model_name")
  (brand, model)
}

def lookupModelPartitions(wurflXmlPath: String)(userAgentIterator: 
Iterator[String]) = {
  val wurfl = new GeneralWURFLEngine(wurflXmlPath)
  wurfl.setEngineTarget(EngineTarget.accuracy)
  userAgentIterator.map(lookupModel(wurfl))
}

// the following will work
val wurflEngine = new 
GeneralWURFLEngine("/root/wurfl-1.6.1.0-release/wurfl.zip")
val userAgentSample = // my local dataset
val modelSample = userAgentSample.map(lookupModel(wurflEngine))

// the following will also work
val userAgentRdd = // my spark dataset
val modelRdd = 
userAgentRdd.mapPartitions(lookupModelPartitions("/root/wurfl-1.6.1.0-release/wurfl.zip”))
modelRdd.take(10)

// but the following will not work
modelRdd.count

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 3.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3.0 (TID 
491, 10.128.224.227): net.sourceforge.wurfl.core.exc.WURFLRuntimeException: 
WURFL unexpected exception
at 
net.sourceforge.wurfl.core.GeneralWURFLEngine.initIfNeeded(GeneralWURFLEngine.java:286)
at 
net.sourceforge.wurfl.core.GeneralWURFLEngine.getDeviceForRequest(GeneralWURFLEngine.java:425)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.lookupModel(:23)
at 
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$lookupModelPartitions$1.apply(:27)
at 
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$lookupModelPartitions$1.apply(:27)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1628)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1099)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1099)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: net.sourceforge.wurfl.core.resource.exc.WURFLResourceException: 
WURFL unexpected exception
at 
net.sourceforge.wurfl.core.resource.XMLResource.readData(XMLResource.java:350)
at 
net.sourceforge.wurfl.core.resource.XMLResource.getData(XMLResource.java:154)
at 
net.sourceforge.wurfl.core.resource.DefaultWURFLModel.init(DefaultWURFLModel.java:118)
at 
net.sourceforge.wurfl.core.resource.DefaultWURFLModel.(DefaultWURFLModel.java:110)
at 
net.sourceforge.wurfl.core.GeneralWURFLEngine.init(GeneralWURFLEngine.java:304)
at 
net.sourceforge.wurfl.core.GeneralWURFLEngine.initIfNeeded(GeneralWURFLEngine.java:283)
... 16 more
Caused by: net.sourceforge.wurfl.core.resource.exc.WURFLParsingException: The 
devices with id generic define more is_wireless_device
at 
net.sourceforge.wurfl.core.resource.XMLResource$WURFLSAXHandler.startCapabilityElement(XMLResource.java:680)
at 
net.sourceforge.wurfl.core.resource.XMLResource$WURFLSAXHandler.startElement(XMLResource.java:534)
at 
com.sun.org.apache.xerces.internal.parsers.AbstractSAXParser.startElement(AbstractSAXParser.java:509)
at 
com.sun.org.apache.xerces.internal.parsers.AbstractXMLDocumentParser.emptyElement(AbstractXMLDocumentParser.java:182)
at 
com.sun.org.apache.xerces.internal.impl.XMLDocumentFragmentScannerImpl.scanStartElement(XMLDocumentFragmentScannerImpl.java:1343)
at 
com.sun.org.apache.xerces.internal.impl.XMLDocumentFragmentScannerImpl$FragmentContentDriver.next(XMLDocumentFragmentScannerImpl.java:2786)
at 
com.sun.org.apache.xerces.internal.impl.XMLDocumentScannerImpl.next(XMLDocumentScannerImpl.java:606)
at 
com.sun.org.apache.xerc

Re: Issue with column named "count" in a DataFrame

2015-07-22 Thread Michael Armbrust
I believe this will be fixed in Spark 1.5

https://github.com/apache/spark/pull/7237

On Wed, Jul 22, 2015 at 3:04 PM, Young, Matthew T  wrote:

> I'm trying to do some simple counting and aggregation in an IPython
> notebook with Spark 1.4.0 and I have encountered behavior that looks like a
> bug.
>
> When I try to filter rows out of an RDD with a column name of count I get
> a large error message. I would just avoid naming things count, except for
> the fact that this is the default column name created with the count()
> operation in pyspark.sql.GroupedData
>
> The small example program below demonstrates the issue.
>
> from pyspark.sql import SQLContext
> sqlContext = SQLContext(sc)
> dataFrame = sc.parallelize([("foo",), ("foo",), ("bar",)]).toDF(["title"])
> counts = dataFrame.groupBy('title').count()
> counts.filter("title = 'foo'").show() # Works
> counts.filter("count > 1").show() # Errors out
>
>
> I can even reproduce the issue in a PySpark shell session by entering
> these commands.
>
> I suspect that the error has something to with Spark wanting to call the
> count() function in place of looking at the count column.
>
> The error message is as follows:
>
>
> Py4JJavaError Traceback (most recent call last)
>  in ()
> > 1 counts.filter("count > 1").show() # Errors Out
>
> C:\Users\User\Downloads\spark-1.4.0-bin-hadoop2.6\python\pyspark\sql\dataframe.pyc
> in filter(self, condition)
> 774 """
> 775 if isinstance(condition, basestring):
> --> 776 jdf = self._jdf.filter(condition)
> 777 elif isinstance(condition, Column):
> 778 jdf = self._jdf.filter(condition._jc)
>
> C:\Python27\lib\site-packages\py4j\java_gateway.pyc in __call__(self,
> *args)
> 536 answer = self.gateway_client.send_command(command)
> 537 return_value = get_return_value(answer,
> self.gateway_client,
> --> 538 self.target_id, self.name)
> 539
> 540 for temp_arg in temp_args:
>
> C:\Python27\lib\site-packages\py4j\protocol.pyc in
> get_return_value(answer, gateway_client, target_id, name)
> 298 raise Py4JJavaError(
> 299 'An error occurred while calling {0}{1}{2}.\n'.
> --> 300 format(target_id, '.', name), value)
> 301 else:
> 302 raise Py4JError(
>
> Py4JJavaError: An error occurred while calling o229.filter.
> : java.lang.RuntimeException: [1.7] failure: ``('' expected but `>' found
>
> count > 1
>   ^
> at scala.sys.package$.error(package.scala:27)
> at
> org.apache.spark.sql.catalyst.SqlParser.parseExpression(SqlParser.scala:45)
> at org.apache.spark.sql.DataFrame.filter(DataFrame.scala:652)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
> at java.lang.reflect.Method.invoke(Unknown Source)
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
> at
> py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
> at py4j.Gateway.invoke(Gateway.java:259)
> at
> py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
> at py4j.commands.CallCommand.execute(CallCommand.java:79)
> at py4j.GatewayConnection.run(GatewayConnection.java:207)
> at java.lang.Thread.run(Unknown Source)
>
>
>
> Is there a recommended workaround to the inability to filter on a column
> named count? Do I have to make a new DataFrame and rename the column just
> to work around this bug? What's the best way to do that?
>
> Thanks,
>
> -- Matthew Young
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Issue with column named "count" in a DataFrame

2015-07-22 Thread Michael Armbrust
Additionally have you tried enclosing count in `backticks`?

On Wed, Jul 22, 2015 at 4:25 PM, Michael Armbrust 
wrote:

> I believe this will be fixed in Spark 1.5
>
> https://github.com/apache/spark/pull/7237
>
> On Wed, Jul 22, 2015 at 3:04 PM, Young, Matthew T <
> matthew.t.yo...@intel.com> wrote:
>
>> I'm trying to do some simple counting and aggregation in an IPython
>> notebook with Spark 1.4.0 and I have encountered behavior that looks like a
>> bug.
>>
>> When I try to filter rows out of an RDD with a column name of count I get
>> a large error message. I would just avoid naming things count, except for
>> the fact that this is the default column name created with the count()
>> operation in pyspark.sql.GroupedData
>>
>> The small example program below demonstrates the issue.
>>
>> from pyspark.sql import SQLContext
>> sqlContext = SQLContext(sc)
>> dataFrame = sc.parallelize([("foo",), ("foo",), ("bar",)]).toDF(["title"])
>> counts = dataFrame.groupBy('title').count()
>> counts.filter("title = 'foo'").show() # Works
>> counts.filter("count > 1").show() # Errors out
>>
>>
>> I can even reproduce the issue in a PySpark shell session by entering
>> these commands.
>>
>> I suspect that the error has something to with Spark wanting to call the
>> count() function in place of looking at the count column.
>>
>> The error message is as follows:
>>
>>
>> Py4JJavaError Traceback (most recent call
>> last)
>>  in ()
>> > 1 counts.filter("count > 1").show() # Errors Out
>>
>> C:\Users\User\Downloads\spark-1.4.0-bin-hadoop2.6\python\pyspark\sql\dataframe.pyc
>> in filter(self, condition)
>> 774 """
>> 775 if isinstance(condition, basestring):
>> --> 776 jdf = self._jdf.filter(condition)
>> 777 elif isinstance(condition, Column):
>> 778 jdf = self._jdf.filter(condition._jc)
>>
>> C:\Python27\lib\site-packages\py4j\java_gateway.pyc in __call__(self,
>> *args)
>> 536 answer = self.gateway_client.send_command(command)
>> 537 return_value = get_return_value(answer,
>> self.gateway_client,
>> --> 538 self.target_id, self.name)
>>
>> 539
>> 540 for temp_arg in temp_args:
>>
>> C:\Python27\lib\site-packages\py4j\protocol.pyc in
>> get_return_value(answer, gateway_client, target_id, name)
>> 298 raise Py4JJavaError(
>> 299 'An error occurred while calling
>> {0}{1}{2}.\n'.
>> --> 300 format(target_id, '.', name), value)
>> 301 else:
>> 302 raise Py4JError(
>>
>> Py4JJavaError: An error occurred while calling o229.filter.
>> : java.lang.RuntimeException: [1.7] failure: ``('' expected but `>' found
>>
>> count > 1
>>   ^
>> at scala.sys.package$.error(package.scala:27)
>> at
>> org.apache.spark.sql.catalyst.SqlParser.parseExpression(SqlParser.scala:45)
>> at org.apache.spark.sql.DataFrame.filter(DataFrame.scala:652)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
>> at java.lang.reflect.Method.invoke(Unknown Source)
>> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
>> at
>> py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
>> at py4j.Gateway.invoke(Gateway.java:259)
>> at
>> py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
>> at py4j.commands.CallCommand.execute(CallCommand.java:79)
>> at py4j.GatewayConnection.run(GatewayConnection.java:207)
>> at java.lang.Thread.run(Unknown Source)
>>
>>
>>
>> Is there a recommended workaround to the inability to filter on a column
>> named count? Do I have to make a new DataFrame and rename the column just
>> to work around this bug? What's the best way to do that?
>>
>> Thanks,
>>
>> -- Matthew Young
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: assertion failed error with GraphX

2015-07-22 Thread Roman Sokolov
I am also having problems with triangle count - seems like this algorithm
is very memory consuming (I could not process even small graphs ~ 5 million
Vertices and 70 million Edges with less the 32 GB RAM on EACH machine).
What if I have graphs with billion edges, what amount of RAM do I need then?

So now I am trying to understand how it works and rewrite it maybe. I would
like to process big graphs with not so much RAM on each machine.
Am 20.07.2015 04:27 schrieb "Jack Yang" :

>  Hi there,
>
>
>
> I got an error when running one simple graphX program.
>
> My setting is: spark 1.4.0, Hadoop yarn 2.5. scala 2.10. with four virtual
> machines.
>
>
>
> if I constructed one small graph (6 nodes, 4 edges), I run:
>
> println("triangleCount: %s ".format(
> hdfs_graph.triangleCount().vertices.count() ))
>
> that returns me the correct results.
>
>
>
> But I import a much larger graph (with 85 nodes, 500 edges), the
> error is
>
> 15/07/20 12:03:36 WARN scheduler.TaskSetManager: Lost task 2.0 in stage
> 11.0 (TID 32, 192.168.157.131): java.lang.AssertionError: assertion failed
>
> at scala.Predef$.assert(Predef.scala:165)
>
> at
> org.apache.spark.graphx.lib.TriangleCount$$anonfun$7.apply(TriangleCount.scala:90)
>
> at
> org.apache.spark.graphx.lib.TriangleCount$$anonfun$7.apply(TriangleCount.scala:87)
>
> at
> org.apache.spark.graphx.impl.VertexPartitionBaseOps.leftJoin(VertexPartitionBaseOps.scala:140)
>
> at
> org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun$3.apply(VertexRDDImpl.scala:159)
>
> at
> org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun$3.apply(VertexRDDImpl.scala:156)
>
> at
> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>
>
>
>
>
> I run the above two graphs using the same submit command:
>
> spark-submit --class "sparkUI.GraphApp" --master spark://master:7077
> --executor-memory 2G  --total-executor-cores 4 myjar.jar
>
>
>
> any thought? anything wrong with my machine or configuration?
>
>
>
>
>
>
>
>
>
> Best regards,
>
> Jack
>
>
>


Package Release Annoucement: Spark SQL on HBase "Astro"

2015-07-22 Thread Bing Xiao (Bing)
We are happy to announce the availability of the Spark SQL on HBase 1.0.0 
release.  http://spark-packages.org/package/Huawei-Spark/Spark-SQL-on-HBase
The main features in this package, dubbed "Astro", include:

* Systematic and powerful handling of data pruning and intelligent 
scan, based on partial evaluation technique

* HBase pushdown capabilities like custom filters and coprocessor to 
support ultra low latency processing

* SQL, Data Frame support

* More SQL capabilities made possible (Secondary index, bloom filter, 
Primary Key, Bulk load, Update)

* Joins with data from other sources

* Python/Java/Scala support

* Support latest Spark 1.4.0 release


The tests by Huawei team and community contributors covered the areas: bulk 
load; projection pruning; partition pruning; partial evaluation; code 
generation; coprocessor; customer filtering; DML; complex filtering on keys and 
non-keys; Join/union with non-Hbase data; Data Frame; multi-column family test. 
 We will post the test results including performance tests the middle of August.
You are very welcomed to try out or deploy the package, and help improve the 
integration tests with various combinations of the settings, extensive Data 
Frame tests, complex join/union test and extensive performance tests.  Please 
use the "Issues" "Pull Requests" links at this package homepage, if you want to 
report bugs, improvement or feature requests.
Special thanks to project owner and technical leader Yan Zhou, Huawei global 
team, community contributors and Databricks.   Databricks has been providing 
great assistance from the design to the release.
"Astro", the Spark SQL on HBase package will be useful for ultra low latency 
query and analytics of large scale data sets in vertical enterprises. We will 
continue to work with the community to develop new features and improve code 
base.  Your comments and suggestions are greatly appreciated.

Yan Zhou / Bing Xiao
Huawei Big Data team



Comparison between Standalone mode and YARN mode

2015-07-22 Thread Dogtail Ray
Hi,

I am very curious about the differences between Standalone mode and YARN
mode. According to
http://blog.cloudera.com/blog/2014/05/apache-spark-resource-management-and-yarn-app-models/,
it seems that YARN mode is always better than Standalone mode. Is that the
case? Or I should choose different modes according to my specific
requirements? Thanks!


Re: No suitable driver found for jdbc:mysql://

2015-07-22 Thread Rishi Yadav
try setting --driver-class-path

On Wed, Jul 22, 2015 at 3:45 PM, roni  wrote:

> Hi All,
>  I have a cluster with spark 1.4.
> I am trying to save data to mysql but getting error
>
> Exception in thread "main" java.sql.SQLException: No suitable driver found
> for jdbc:mysql://<>.rds.amazonaws.com:3306/DAE_kmer?user=<>&password=<>
>
>
> *I looked at - https://issues.apache.org/jira/browse/SPARK-8463
>  and added the connector
> jar to the same location as on Master using copy-dir script.*
>
> *But I am still getting the same error. This sued to work with 1.3.*
>
> *This is my command  to run the program - **$SPARK_HOME/bin/spark-submit
> --jars
> /root/spark/lib/mysql-connector-java-5.1.35/mysql-connector-java-5.1.35-bin.jar
> --conf
> spark.executor.extraClassPath=/root/spark/lib/mysql-connector-java-5.1.35/mysql-connector-java-5.1.35-bin.jar
> --conf spark.executor.memory=55g --driver-memory=55g
> --master=spark://ec2-52-25-191-999.us-west-2.compute.amazonaws.com:7077
>   --class
> "saveBedToDB"  target/scala-2.10/adam-project_2.10-1.0.jar*
>
> *What else can I Do ?*
>
> *Thanks*
>
> *-Roni*
>


Re: spark-submit and spark-shell behaviors mismatch.

2015-07-22 Thread Yana Kadiyska
Is it complaining about "collect" or "toMap"? In either case this error is
indicative of an old version usually -- any chance you have an old
installation of Spark somehow? Or scala? You can try running spark-submit
with --verbose. Also, when you say it runs with spark-shell do you run
spark shell in local mode or with --master? I'd try with --master 

Also, if you're using standalone mode I believe the worker log contains the
launch command for the executor -- you probably want to examine that
classpath carefully

On Wed, Jul 22, 2015 at 5:25 PM, Dan Dong  wrote:

> Hi,
>
>   I have a simple test spark program as below, the strange thing is that
> it runs well under a spark-shell, but will get a runtime error of
>
> java.lang.NoSuchMethodError:
>
> in spark-submit, which indicate the line of:
>
> val maps2=maps.collect.toMap
>
> has problem. But why the compilation has no problem and it works well
> under spark-shell(==>maps2: scala.collection.immutable.Map[Int,String] =
> Map(269953 -> once, 97 -> a, 451002 -> upon, 117481 -> was, 226916 ->
> there, 414413 -> time, 146327 -> king) )? Thanks!
>
> import org.apache.spark.SparkContext._
> import org.apache.spark.SparkConf
> import org.apache.spark.mllib.feature.HashingTF
> import org.apache.spark.mllib.linalg.Vector
> import org.apache.spark.rdd.RDD
> import org.apache.spark.SparkContext
> import org.apache.spark._
> import SparkContext._
>
>
> val docs=sc.parallelize(Array(Array("once" ,"upon", "a", "time"), 
> Array("there", "was", "a", "king")))
>
> val hashingTF = new HashingTF()
>
> val maps=docs.flatMap{term=>term.map(ele=>(hashingTF.indexOf(ele),ele))}
>
> val maps2=maps.collect.toMap
>
>
> Cheers,
>
> Dan
>
>


ShuffledHashJoin instead of CartesianProduct

2015-07-22 Thread Srikanth
Hello,

I'm trying to link records from two large data sources. Both datasets have
almost same number of rows.
Goal is to match records based on multiple columns.

val matchId =
> SFAccountDF.as("SF").join(ELAccountDF.as("EL")).where($"SF.Email" ===
> $"EL.EmailAddress" || $"SF.Phone" === "EL.Phone")


Joining with a OR(||) will result in a CartesianProduct. I'm trying to
avoid that.
One way to do this is to join on each column and UNION the results.


> val phoneMatch = SFAccountDF.as("SF").filter("Phone !=
> ''").join(ELAccountDF.as("EL").filter("BusinessPhone !=
> ''")).where($"SF.Phone" === $"EL.BusinessPhone")
> val emailMatch = SFAccountDF.as("SF").filter("Email !=
> ''").join(ELAccountDF.as("EL").filter("EmailAddress !=
> ''")).where($"SF.Email" === $"EL.EmailAddress")
>
> val matchId =
> phoneMatch.unionAll(emailMatch.unionAll(faxMatch.unionAll(mobileMatch)))
> matchId.cache().registerTempTable("matchId")


Is there a more elegant way to do this?

On a related note, has anyone worked on record linkage using Bloom Filters,
Levenshtein distance, etc in Spark?

Srikanth


What if request cores are not satisfied

2015-07-22 Thread bit1...@163.com
Hi,
Assume a following scenario:
The spark standalone cluster has 10 cores in total, I have an application that 
will request 12 cores. Will the application run with fewer cores than requested 
or will it simply wait for ever since there are only 10 cores available.
I would guess it will be run with fewer cores, but I didn't get a chance to 
try/test it. 
Thanks.




bit1...@163.com


Re: Package Release Annoucement: Spark SQL on HBase "Astro"

2015-07-22 Thread Debasish Das
Does it also support insert operations ?
On Jul 22, 2015 4:53 PM, "Bing Xiao (Bing)"  wrote:

>  We are happy to announce the availability of the Spark SQL on HBase
> 1.0.0 release.
> http://spark-packages.org/package/Huawei-Spark/Spark-SQL-on-HBase
>
> The main features in this package, dubbed “Astro”, include:
>
> · Systematic and powerful handling of data pruning and
> intelligent scan, based on partial evaluation technique
>
> · HBase pushdown capabilities like custom filters and coprocessor
> to support ultra low latency processing
>
> · SQL, Data Frame support
>
> · More SQL capabilities made possible (Secondary index, bloom
> filter, Primary Key, Bulk load, Update)
>
> · Joins with data from other sources
>
> · Python/Java/Scala support
>
> · Support latest Spark 1.4.0 release
>
>
>
> The tests by Huawei team and community contributors covered the areas:
> bulk load; projection pruning; partition pruning; partial evaluation; code
> generation; coprocessor; customer filtering; DML; complex filtering on keys
> and non-keys; Join/union with non-Hbase data; Data Frame; multi-column
> family test.  We will post the test results including performance tests the
> middle of August.
>
> You are very welcomed to try out or deploy the package, and help improve
> the integration tests with various combinations of the settings, extensive
> Data Frame tests, complex join/union test and extensive performance tests.
> Please use the “Issues” “Pull Requests” links at this package homepage, if
> you want to report bugs, improvement or feature requests.
>
> Special thanks to project owner and technical leader Yan Zhou, Huawei
> global team, community contributors and Databricks.   Databricks has been
> providing great assistance from the design to the release.
>
> “Astro”, the Spark SQL on HBase package will be useful for ultra low
> latency* query and analytics of large scale data sets in vertical
> enterprises**.* We will continue to work with the community to develop
> new features and improve code base.  Your comments and suggestions are
> greatly appreciated.
>
>
>
> Yan Zhou / Bing Xiao
>
> Huawei Big Data team
>
>
>


Re: Need help in setting up spark cluster

2015-07-22 Thread Jeetendra Gangele
Can anybody help here?

On 22 July 2015 at 10:38, Jeetendra Gangele  wrote:

> Hi All,
>
> I am trying to capture the user activities for real estate portal.
>
> I am using RabbitMS and Spark streaming combination where all the Events I
> am pushing to RabbitMQ and then 1 secs micro job I am consuming using Spark
> streaming.
>
> Later on I am thinking to store the consumed data for analytics or near
> real time recommendations.
>
> Where should I store this data in Spark RDD itself and using SparkSQL
> people can query this data for analytics or real time recommendations, this
> data is not huge currently its 10 GB per day.
>
> Another alternatiove will be either Hbase or Cassandra, which one will be
> better?
>
> Any suggestions?
>
>
> Also for this use cases should I use any existing big data platform like
> hortonworks or I can deploy standalone spark cluster ?
>


Re: Need help in SparkSQL

2015-07-22 Thread Jeetendra Gangele
Query will be something like that

1. how many users visited 1 BHK flat in last 1 hour in given particular area
2. how many visitor for flats in give area
3. list all user who bought given property in last 30 days

Further it may go too complex involving multiple parameters in my query.

The problem is HBase is designing row key to get this data efficiently.

Since I have multiple fields to query upon base may not be a good choice?

i dont dont to iterate the result set which Hbase returns and give the
result because this will kill the performance?

On 23 July 2015 at 01:02, Jörn Franke  wrote:

> Can you provide an example of an and query ? If you do just look-up you
> should try Hbase/ phoenix, otherwise you can try orc with storage index
> and/or compression, but this depends on how your queries look like
>
> Le mer. 22 juil. 2015 à 14:48, Jeetendra Gangele  a
> écrit :
>
>> HI All,
>>
>> I have data in MongoDb(few TBs) which I want to migrate to HDFS to do
>> complex queries analysis on this data.Queries like AND queries involved
>> multiple fields
>>
>> So my question in which which format I should store the data in HDFS so
>> that processing will be fast for such kind of queries?
>>
>>
>> Regards
>> Jeetendra
>>
>>


-- 
Hi,

Find my attached resume. I have total around 7 years of work experience.
I worked for Amazon and Expedia in my previous assignments and currently I
am working with start- up technology company called Insideview in hyderabad.

Regards
Jeetendra


Hive Session gets overwritten in ClientWrapper

2015-07-22 Thread Vishak
I'm currently using Spark 1.4 in standalone mode.

I've forked the Apache Hive branch from  https://github.com/pwendell/hive
   and customised in the following way.

Added a thread local variable in SessionManager class. And I'm setting the
session variable in my Custom Authenticator class. 

 For achieving the above, I've built the necessary
jars(hive-common-0.13.1c.jar, hive-exec-0.13.1c.jar,
hive-metastore-0.13.1c.jar, hive-serde-0.13.1c.jar,
hive-service-0.13.1c.jar) from https://github.com/pwendell/hive and added to
Spark's classpath. 
 The above feature works in Spark 1.3.1, but is broken in Spark 1.4.
When I looked into it, I found out that the ClientWrapper class is creating
a new Session State and using it thereafter. As a result I'm not able to
retrieve the info which i had stored earlier in the session. Also I'm not
able to retrieve a value from hiveconf which was set earlier.

  When i looked into the source code for ClientWrapper.scala, i found
the following.

   // Create an internal session state for this ClientWrapper.
  val state = {
val original = Thread.currentThread().getContextClassLoader
// Switch to the initClassLoader.
Thread.currentThread().setContextClassLoader(initClassLoader)

>From what i can understand, the above tries to use the existing Hive
session, else it creates it's own session. Am I right? If so, is there a bug
causing the ClientWrapper not to use the existing session. Or should I
implement my requirement in a different way?

My requirement is to have a custom session variable and use it throughout
the session.

My usage is as folows:

To set the value
/SessionManager.setSessionVar("value");/

To retrieve the value
/SessionManager.getSessionVar();/

To set a hiveconf
/hiveConf.set(conf, ConfVars.VAR, "val");/

to Retrieve
/hiveConf.get(ConfVars.VAR);
SessionState.get().getConf().getVar(ConfVars.VAR)/




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Hive-Session-gets-overwritten-in-ClientWrapper-tp23962.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Need help in SparkSQL

2015-07-22 Thread Jörn Franke
I do not think you can put all your queries into the row key without
duplicating the data for each query. However, this would be more last
resort.

Have you checked out phoenix for Hbase? This might suit your needs. It
makes it much simpler, because it provided sql on top of Hbase.

Nevertheless, Hive could also be a viable alternative depending on how
often you run queries etc

Le jeu. 23 juil. 2015 à 7:14, Jeetendra Gangele  a
écrit :

> Query will be something like that
>
> 1. how many users visited 1 BHK flat in last 1 hour in given particular
> area
> 2. how many visitor for flats in give area
> 3. list all user who bought given property in last 30 days
>
> Further it may go too complex involving multiple parameters in my query.
>
> The problem is HBase is designing row key to get this data efficiently.
>
> Since I have multiple fields to query upon base may not be a good choice?
>
> i dont dont to iterate the result set which Hbase returns and give the
> result because this will kill the performance?
>
> On 23 July 2015 at 01:02, Jörn Franke  wrote:
>
>> Can you provide an example of an and query ? If you do just look-up you
>> should try Hbase/ phoenix, otherwise you can try orc with storage index
>> and/or compression, but this depends on how your queries look like
>>
>> Le mer. 22 juil. 2015 à 14:48, Jeetendra Gangele 
>> a écrit :
>>
>>> HI All,
>>>
>>> I have data in MongoDb(few TBs) which I want to migrate to HDFS to do
>>> complex queries analysis on this data.Queries like AND queries involved
>>> multiple fields
>>>
>>> So my question in which which format I should store the data in HDFS so
>>> that processing will be fast for such kind of queries?
>>>
>>>
>>> Regards
>>> Jeetendra
>>>
>>>
>
>
> --
> Hi,
>
> Find my attached resume. I have total around 7 years of work experience.
> I worked for Amazon and Expedia in my previous assignments and currently I
> am working with start- up technology company called Insideview in hyderabad.
>
> Regards
> Jeetendra
>


Re: Re: Need help in setting up spark cluster

2015-07-22 Thread fightf...@163.com
Hi, there

Per for your analytical and real time recommendations request, I would 
recommend you use spark sql and hive thriftserver 

to store and process your spark streaming data. As thriftserver would be run as 
a long-term application and it would be 

quite feasible to cyclely comsume data and provide some analytical 
requitements. 

On the other hand, hbase or cassandra would also be sufficient and I think you 
may want to integrate spark sql with hbase / cassandra

for your data digesting.  You could deploy a CDH or HDP platform to support 
your productive environment running. I suggest you 

firstly to deploy a spark standalone cluster to run some integration tests, and 
also you can consider running spark on yarn for 

the later development use cases. 

Best,
Sun.



fightf...@163.com
 
From: Jeetendra Gangele
Date: 2015-07-23 13:39
To: user
Subject: Re: Need help in setting up spark cluster
Can anybody help here?

On 22 July 2015 at 10:38, Jeetendra Gangele  wrote:
Hi All, 

I am trying to capture the user activities for real estate portal.

I am using RabbitMS and Spark streaming combination where all the Events I am 
pushing to RabbitMQ and then 1 secs micro job I am consuming using Spark 
streaming.

Later on I am thinking to store the consumed data for analytics or near real 
time recommendations.

Where should I store this data in Spark RDD itself and using SparkSQL people 
can query this data for analytics or real time recommendations, this data is 
not huge currently its 10 GB per day.

Another alternatiove will be either Hbase or Cassandra, which one will be 
better?

Any suggestions?


Also for this use cases should I use any existing big data platform like 
hortonworks or I can deploy standalone spark cluster ?