Re: Reading SequenceFiles from S3 with PySpark on EMR causes RACK_LOCAL locality

2015-07-17 Thread michal.klo...@gmail.com
Make sure you are setting num executors correctly

M



> On Jul 17, 2015, at 9:16 PM, Charles Menguy  wrote:
> 
> I am trying to use PySpark on EMR to analyze some data stored as 
> SequenceFiles on S3, but running into performance issues due to data 
> locality. Here is a very simple sample that doesn't work well:
> 
> seqRDD = 
> sc.sequenceFile("s3n://:@//day=2015-07-04/hour=*/*")
> seqRDD.count()
> 
> The issue is with the count action, it works fine but distribution of the 
> tasks is very poor. For some reason in the Spark logs I only see 2 IPs of the 
> cluster doing any actual work while the rest sits idle. I tried with a 5 node 
> cluster and 50 nodes cluster and it's always only 2 IPs appearing in the logs.
> 
> Also very strange is that these 2 IPs have a locality of RACK_LOCAL. I'm 
> presuming it's because data is in S3 so it's not local, but how can I make 
> Spark use the whole cluster instead of just 2 instances?
> 
> I didn't do anything specific for Spark configuration on EMR, simply 
> installing it on EMR via native app and I believe it takes care automatically 
> of optimizing the configs. I ran PySpark with --master yarn-client
> 
> I saw this in the logs, the allowLocal=false could be an issue but I couldn't 
> find anything on that:
> 
> 15/07/17 23:55:27 INFO spark.SparkContext: Starting job: count at :1
> 15/07/17 23:55:27 INFO scheduler.DAGScheduler: Got job 1 (count at :1) with 
> 1354 output partitions (allowLocal=false)
> 15/07/17 23:55:27 INFO scheduler.DAGScheduler: Final stage: Stage 1(count at 
> :1)
> 
> Some logs that follow when running the count, showing only 2 IPs:
> 
> 15/07/17 23:55:28 INFO scheduler.DAGScheduler: Submitting 1354 missing tasks 
> from Stage 1 (PythonRDD[3] at count at :1)
> 15/07/17 23:55:28 INFO cluster.YarnScheduler: Adding task set 1.0 with 1354 
> tasks
> 15/07/17 23:55:28 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 
> 1.0 (TID 1, ip-172-31-41-210.ec2.internal, RACK_LOCAL, 1418 bytes)
> 15/07/17 23:55:28 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 
> 1.0 (TID 2, ip-172-31-36-179.ec2.internal, RACK_LOCAL, 1420 bytes)
> 15/07/17 23:55:28 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in 
> memory on ip-172-31-36-179.ec2.internal:39998 (size: 3.7 KB, free: 535.0 MB)
> 15/07/17 23:55:28 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in 
> memory on ip-172-31-41-210.ec2.internal:36847 (size: 3.7 KB, free: 535.0 MB)
> 15/07/17 23:55:29 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in 
> memory on ip-172-31-41-210.ec2.internal:36847 (size: 18.8 KB, free: 535.0 MB)
> 15/07/17 23:55:31 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 
> 1.0 (TID 3, ip-172-31-41-210.ec2.internal, RACK_LOCAL, 1421 bytes)
> 15/07/17 23:55:31 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 
> 1.0 (TID 1) in 3501 ms on ip-172-31-41-210.ec2.internal (1/1354)
> 15/07/17 23:55:31 INFO scheduler.TaskSetManager: Starting task 3.0 in stage 
> 1.0 (TID 4, ip-172-31-41-210.ec2.internal, RACK_LOCAL, 1420 bytes)
> 15/07/17 23:55:31 INFO scheduler.TaskSetManager: Finished task 2.0 in stage 
> 1.0 (TID 3) in 99 ms on ip-172-31-41-210.ec2.internal (2/1354)
> 15/07/17 23:55:33 INFO scheduler.TaskSetManager: Starting task 4.0 in stage 
> 1.0 (TID 5, ip-172-31-36-179.ec2.internal, RACK_LOCAL, 1420 bytes)
> 15/07/17 23:55:33 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 
> 1.0 (TID 2) in 5190 ms on ip-172-31-36-179.ec2.internal (3/1354)
> 15/07/17 23:55:36 INFO scheduler.TaskSetManager: Starting task 5.0 in stage 
> 1.0 (TID 6, ip-172-31-41-210.ec2.internal, RACK_LOCAL, 1420 bytes)
> 15/07/17 23:55:36 INFO scheduler.TaskSetManager: Finished task 3.0 in stage 
> 1.0 (TID 4) in 4471 ms on ip-172-31-41-210.ec2.internal (4/1354)
> 15/07/17 23:55:37 INFO scheduler.TaskSetManager: Starting task 6.0 in stage 
> 1.0 (TID 7, ip-172-31-36-179.ec2.internal, RACK_LOCAL, 1420 bytes)
> 15/07/17 23:55:37 INFO scheduler.TaskSetManager: Finished task 4.0 in stage 
> 1.0 (TID 5) in 3676 ms on ip-172-31-36-179.ec2.internal (5/1354)
> 15/07/17 23:55:40 INFO scheduler.TaskSetManager: Starting task 7.0 in stage 
> 1.0 (TID 8, ip-172-31-41-210.ec2.internal, RACK_LOCAL, 1420 bytes)
> 15/07/17 23:55:40 INFO scheduler.TaskSetManager: Finished task 5.0 in stage 
> 1.0 (TID 6) in 3895 ms on ip-172-31-41-210.ec2.internal (6/1354)
> 15/07/17 23:55:40 INFO scheduler.TaskSetManager: Starting task 8.0 in stage 
> 1.0 (TID 9, ip-1
> 
> I also tried eliminating S3 by distcp'ing the S3 data first into HDFS in the 
> EMR cluster and then running a count() on that, but it doesn't make much 
> difference, there are still only 2 IPs processing, they initially start as 
> NODE_LOCAL but eventually switch to RACK_LOCAL.
> 
> I'm at a loss at what I have misconfigured, any help would be appreciated.
> 
> Thanks !
> 
> Charles


Re: Spark 1.5.1 standalone cluster - wrong Akka remoting config?

2015-10-08 Thread michal.klo...@gmail.com
Try setting spark.driver.host to the actual ip or hostname of the box 
submitting the work. More info the networking section in this link:

http://spark.apache.org/docs/latest/configuration.html

Also check the spark config for your application for these driver settings in 
the application web UI at http://:4040  in the “Environment” tab. More 
info in the "viewing configuration properties" section in that link.

M



> On Oct 8, 2015, at 7:04 AM, baraky  wrote:
> 
> Doing my firsts steps with Spark, I'm facing problems submitting jobs to
> cluster from the application code. Digging the logs, I noticed some periodic
> WARN messages on master log:
> 
> 15/10/08 13:00:00 WARN remote.ReliableDeliverySupervisor: Association with
> remote system [akka.tcp://sparkDriver@192.168.254.167:64014] has failed,
> address is now gated for [5000] ms. Reason: [Disassociated]
> 
> The problem is that ip address not exist on our network, and wasn't
> configured anywhere. The same wrong ip is shown on the worker log when it
> tries execute the task (wrong ip passed to --driver-url):
> 
> 15/10/08 12:58:21 INFO worker.ExecutorRunner: Launch command:
> "/usr/java/latest//bin/java" "-cp" "/path/spark/spark-1.5.1-bin-ha
> doop2.6/sbin/../conf/:/path/spark/spark-1.5.1-bin-hadoop2.6/lib/spark-assembly-1.5.1-hadoop2.6.0.jar:/path/spark/
> spark-1.5.1-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/path/spark/spark-1.5.1-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.ja
> r:/path/spark/spark-1.5.1-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/path/hadoop/2.6.0//etc/hadoop/"
> "-Xms102
> 4M" "-Xmx1024M" "-Dspark.driver.port=64014" "-Dspark.driver.port=53411"
> "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url"
> "akka.tcp://sparkDriver@192.168.254.167:64014/user/CoarseGrainedScheduler"
> "--executor-id" "39" "--hostname" "192.168.10.214" "--cores" "16" "--app-id" 
> "app-20151008123702-0003" "--worker-url"
> "akka.tcp://sparkWorker@192.168.10.214:37625/user/Worker"
> 15/10/08 12:59:28 INFO worker.Worker: Executor app-20151008123702-0003/39
> finished with state EXITED message Command exited with code 1 exitStatus 1
> Any idea what I did wrong and how can this be fixed?
> 
> The java version is 1.8.0_20, and I'm using pre-built Spark binaries.
> 
> Thanks!
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-5-1-standalone-cluster-wrong-Akka-remoting-config-tp24978.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 


Re: Scalable JDBCRDD

2015-03-01 Thread michal.klo...@gmail.com
Jorn: Vertica 

Cody: I posited the limit just as an example of how jdbcrdd could be used least 
invasively. Let's say we used a partition on a time field -- we would still 
need to have N executions of those queries. The queries we have are very 
intense and concurrency is an issue even if the the N partitioned queries are 
smaller. Some queries require evaluating the whole data set first. If our use 
case a simple select * from table.. Then the partitions would be an easier sell 
if it wasn't for the concurrency problem :) Long story short -- we need only 
one execution of the query and would like to just divy out the result set.

M



> On Mar 1, 2015, at 5:18 AM, Jörn Franke  wrote:
> 
> What database are you using?
> 
> Le 28 févr. 2015 18:15, "Michal Klos"  a écrit :
>> Hi Spark community,
>> 
>> We have a use case where we need to pull huge amounts of data from a SQL 
>> query against a database into Spark. We need to execute the query against 
>> our huge database and not a substitute (SparkSQL, Hive, etc) because of a 
>> couple of factors including custom functions used in the queries that only 
>> our database has.
>> 
>> We started by looking at JDBC RDD, which utilizes a prepared statement with 
>> two parameters that are meant to be used to partition the result set to the 
>> workers... e.g.:
>> 
>> select * from table limit ?,?
>> 
>> turns into
>> 
>> select * from table limit 1,100 on worker 1
>> select * from table limit 101,200 on worker 2
>> 
>> This will not work for us because our database cannot support multiple 
>> execution of these queries without being crippled. But, additionally, our 
>> database doesn't support the above LIMIT syntax and we don't have a generic 
>> way of partitioning the various queries.
>> 
>> As a result -- we stated by forking JDBCRDD and made a version that executes 
>> the SQL query once in getPartitions into a Vector and then hands each worker 
>> node an index and iterator. Here's a snippet of getPartitions and compute:
>> 
>>   override def getPartitions: Array[Partition] = {
>> //Compute the DB query once here
>> val results = computeQuery
>>  
>> (0 until numPartitions).map(i => {
>>   // TODO: would be better to do this partitioning when scrolling 
>> through result set if still loading into memory
>>   val partitionItems = results.drop(i).sliding(1, 
>> numPartitions).flatten.toVector
>>   new DBPartition(i, partitionItems)
>> }).toArray
>>   }
>> 
>>   override def compute(thePart: Partition, context: TaskContext) = new 
>> NextIterator[T] {
>> val part = thePart.asInstanceOf[DBPartition[T]]
>>  
>> //Shift the result vector to our index number and then do a sliding 
>> iterator over it
>> val iterator = part.items.iterator
>>  
>> override def getNext : T = {
>>   if (iterator.hasNext) {
>> iterator.next()
>>   } else {
>> finished = true
>> null.asInstanceOf[T]
>>   }
>> }
>>  
>> override def close: Unit = ()
>>   }
>> 
>> This is a little better since we can just execute the query once. However, 
>> the result-set needs to fit in memory. 
>> 
>> We've been trying to brainstorm a way to 
>> 
>> A) have that result set distribute out to the worker RDD partitions as it's 
>> streaming in from the cursor?
>> B) have the result set spill to disk if it exceeds memory and do something 
>> clever around the iterators?
>> C) something else?
>> 
>> We're not familiar enough yet with all of the workings of Spark to know how 
>> to proceed on this.
>> 
>> We also thought of the worker-around of having the DB query dump to HDFS/S3 
>> and then pick it up for there, but it adds more moving parts and latency to 
>> our processing. 
>> 
>> Does anyone have a clever suggestion? Are we missing something? 
>> 
>> thanks,
>> Michal


Re: Scalable JDBCRDD

2015-03-01 Thread michal.klo...@gmail.com
Yes exactly.

The temp table is an approach but then we need to manage the deletion of it etc.

I'm sure we won't be the only people with this crazy use case. 

If there isn't a feasible way to do this "within the framework" then that's 
okay. But if there is a way we are happy to write the code and PR it back :)

M



> On Mar 1, 2015, at 10:02 AM, eric  wrote:
> 
> What you're saying is that, due to the intensity of the query, you need to 
> run a single query and partition the results, versus running one query for 
> each partition.
> 
> I assume it's not viable to throw the query results into another table in 
> your database and then query that using the normal approach?
> 
> --eric
> 
>> On 3/1/15 4:28 AM, michal.klo...@gmail.com wrote:
>> Jorn: Vertica 
>> 
>> Cody: I posited the limit just as an example of how jdbcrdd could be used 
>> least invasively. Let's say we used a partition on a time field -- we would 
>> still need to have N executions of those queries. The queries we have are 
>> very intense and concurrency is an issue even if the the N partitioned 
>> queries are smaller. Some queries require evaluating the whole data set 
>> first. If our use case a simple select * from table.. Then the partitions 
>> would be an easier sell if it wasn't for the concurrency problem :) Long 
>> story short -- we need only one execution of the query and would like to 
>> just divy out the result set.
>> 
>> M
>> 
>> 
>> 
>> On Mar 1, 2015, at 5:18 AM, Jörn Franke  wrote:
>> 
>>> What database are you using?
>>> 
>>> Le 28 févr. 2015 18:15, "Michal Klos"  a écrit :
>>>> Hi Spark community,
>>>> 
>>>> We have a use case where we need to pull huge amounts of data from a SQL 
>>>> query against a database into Spark. We need to execute the query against 
>>>> our huge database and not a substitute (SparkSQL, Hive, etc) because of a 
>>>> couple of factors including custom functions used in the queries that only 
>>>> our database has.
>>>> 
>>>> We started by looking at JDBC RDD, which utilizes a prepared statement 
>>>> with two parameters that are meant to be used to partition the result set 
>>>> to the workers... e.g.:
>>>> 
>>>> select * from table limit ?,?
>>>> 
>>>> turns into
>>>> 
>>>> select * from table limit 1,100 on worker 1
>>>> select * from table limit 101,200 on worker 2
>>>> 
>>>> This will not work for us because our database cannot support multiple 
>>>> execution of these queries without being crippled. But, additionally, our 
>>>> database doesn't support the above LIMIT syntax and we don't have a 
>>>> generic way of partitioning the various queries.
>>>> 
>>>> As a result -- we stated by forking JDBCRDD and made a version that 
>>>> executes the SQL query once in getPartitions into a Vector and then hands 
>>>> each worker node an index and iterator. Here's a snippet of getPartitions 
>>>> and compute:
>>>> 
>>>>   override def getPartitions: Array[Partition] = {
>>>> //Compute the DB query once here
>>>> val results = computeQuery
>>>>  
>>>> (0 until numPartitions).map(i => {
>>>>   // TODO: would be better to do this partitioning when scrolling 
>>>> through result set if still loading into memory
>>>>   val partitionItems = results.drop(i).sliding(1, 
>>>> numPartitions).flatten.toVector
>>>>   new DBPartition(i, partitionItems)
>>>> }).toArray
>>>>   }
>>>> 
>>>>   override def compute(thePart: Partition, context: TaskContext) = new 
>>>> NextIterator[T] {
>>>> val part = thePart.asInstanceOf[DBPartition[T]]
>>>>  
>>>> //Shift the result vector to our index number and then do a sliding 
>>>> iterator over it
>>>> val iterator = part.items.iterator
>>>>  
>>>> override def getNext : T = {
>>>>   if (iterator.hasNext) {
>>>> iterator.next()
>>>>   } else {
>>>> finished = true
>>>> null.asInstanceOf[T]
>>>>   }
>>>> }
>>>>  
>>>> override def close: Unit = ()
>>>>   }
>>>> 
>>>> This is a little better since we can just execute the query once. However, 
>>>> the result-set needs to fit in memory. 
>>>> We've been trying to brainstorm a way to 
>>>> A) have that result set distribute out to the worker RDD partitions as 
>>>> it's streaming in from the cursor?
>>>> B) have the result set spill to disk if it exceeds memory and do something 
>>>> clever around the iterators?
>>>> C) something else?
>>>> We're not familiar enough yet with all of the workings of Spark to know 
>>>> how to proceed on this.
>>>> We also thought of the worker-around of having the DB query dump to 
>>>> HDFS/S3 and then pick it up for there, but it adds more moving parts and 
>>>> latency to our processing. 
>>>> Does anyone have a clever suggestion? Are we missing something? 
>>>> thanks,
>>>> Michal
> 


Re: Job submission API

2015-04-07 Thread michal.klo...@gmail.com
A SparkContext can submit jobs remotely.

The spark-submit options in general can be populated into a SparkConf and 
passed in when you create a SparkContext.

We personally have not had too much success with yarn-client remote submission, 
but standalone cluster mode was easy to get going.

M



> On Apr 7, 2015, at 7:01 PM, Prashant Kommireddi  wrote:
> 
> Hello folks,
> 
> Newbie here! Just had a quick question - is there a job submission API such 
> as the one with hadoop 
> https://hadoop.apache.org/docs/r2.3.0/api/org/apache/hadoop/mapreduce/Job.html#submit()
>  to submit Spark jobs to a Yarn cluster? I see in example that 
> bin/spark-submit is what's out there, but couldn't find any APIs around it.
> 
> Thanks,
> Prashant


Re: Querying Cluster State

2015-04-26 Thread michal.klo...@gmail.com
Not sure if there's a spark native way but we've been using consul for this.

M



> On Apr 26, 2015, at 5:17 AM, James King  wrote:
> 
> Thanks for the response.
> 
> But no this does not answer the question.
> 
> The question was: Is there a way (via some API call) to query the number and 
> type of daemons currently running in the Spark cluster.
> 
> Regards
> 
> 
>> On Sun, Apr 26, 2015 at 10:12 AM, ayan guha  wrote:
>> In my limited understanding, there must be single   "leader" master  in the 
>> cluster. If there are multiple leaders, it will lead to unstable cluster as 
>> each masters will keep scheduling independently. You should use zookeeper 
>> for HA, so that standby masters can vote to find new leader if the primary 
>> goes down. 
>> 
>> Now, you can still have multiple masters running as leaders but conceptually 
>> they should be thought as different clusters.
>> 
>> Regarding workers, they should follow their master.  
>> 
>> Not sure if this answers your question, as I am sure you have read the 
>> documentation thoroughly.
>> 
>> Best
>> Ayan
>> 
>>> On Sun, Apr 26, 2015 at 6:31 PM, James King  wrote:
>>> If I have 5 nodes and I wish to maintain 1 Master and 2 Workers on each 
>>> node, so in total I will have 5 master and 10 Workers.
>>> 
>>> Now to maintain that setup I would like to query spark regarding the number 
>>> Masters and Workers that are currently available using API calls and then 
>>> take some appropriate action based on the information I get back, like 
>>> restart a dead Master or Worker.
>>> 
>>> Is this possible? does Spark provide such API?
>> 
>> 
>> 
>> -- 
>> Best Regards,
>> Ayan Guha
> 


Re: submitting to multiple masters

2015-04-28 Thread michal.klo...@gmail.com
According to the docs it should go like this:
spark://host1:port1,host2:port2


https://spark.apache.org/docs/latest/spark-standalone.html#standby-masters-with-zookeeper

Thanks
M


> On Apr 28, 2015, at 8:13 AM, James King  wrote:
> 
> I have multiple masters running and I'm trying to submit an application using
> 
> spark-1.3.0-bin-hadoop2.4/bin/spark-submit  
> 
> with this config (i.e. a comma separated list of master urls)
> 
>   --master spark://master01:7077,spark://master02:7077
> 
> 
> But getting this exception 
> 
> Exception in thread "main" org.apache.spark.SparkException: Invalid master 
> URL: spark://spark://master02:7077
> 
> 
> What am I doing wrong?
> 
> Many Thanks
> jk


Re: How to get Master UI with ZooKeeper HA setup?

2015-05-12 Thread michal.klo...@gmail.com
I've been querying Zookeeper directly via the Zookeeper client tools, it has 
the ip of the current master leader in the master_status data.  We are also 
running Exhibitor for zookeeper which has a nice UI for exploring if you want 
to look up manually

Thanks,
Michal



> On May 12, 2015, at 1:28 AM, Rex Xiong  wrote:
> 
> Hi,
> 
> We have a 3-node master setup with ZooKeeper HA.
> Driver can find the master with spark://xxx:xxx,xxx:xxx,xxx:xxx
> But how can I find out the valid Master UI without looping through all 3 
> nodes?
> 
> Thanks

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org