Re: Reading SequenceFiles from S3 with PySpark on EMR causes RACK_LOCAL locality
Make sure you are setting num executors correctly M > On Jul 17, 2015, at 9:16 PM, Charles Menguy wrote: > > I am trying to use PySpark on EMR to analyze some data stored as > SequenceFiles on S3, but running into performance issues due to data > locality. Here is a very simple sample that doesn't work well: > > seqRDD = > sc.sequenceFile("s3n://:@//day=2015-07-04/hour=*/*") > seqRDD.count() > > The issue is with the count action, it works fine but distribution of the > tasks is very poor. For some reason in the Spark logs I only see 2 IPs of the > cluster doing any actual work while the rest sits idle. I tried with a 5 node > cluster and 50 nodes cluster and it's always only 2 IPs appearing in the logs. > > Also very strange is that these 2 IPs have a locality of RACK_LOCAL. I'm > presuming it's because data is in S3 so it's not local, but how can I make > Spark use the whole cluster instead of just 2 instances? > > I didn't do anything specific for Spark configuration on EMR, simply > installing it on EMR via native app and I believe it takes care automatically > of optimizing the configs. I ran PySpark with --master yarn-client > > I saw this in the logs, the allowLocal=false could be an issue but I couldn't > find anything on that: > > 15/07/17 23:55:27 INFO spark.SparkContext: Starting job: count at :1 > 15/07/17 23:55:27 INFO scheduler.DAGScheduler: Got job 1 (count at :1) with > 1354 output partitions (allowLocal=false) > 15/07/17 23:55:27 INFO scheduler.DAGScheduler: Final stage: Stage 1(count at > :1) > > Some logs that follow when running the count, showing only 2 IPs: > > 15/07/17 23:55:28 INFO scheduler.DAGScheduler: Submitting 1354 missing tasks > from Stage 1 (PythonRDD[3] at count at :1) > 15/07/17 23:55:28 INFO cluster.YarnScheduler: Adding task set 1.0 with 1354 > tasks > 15/07/17 23:55:28 INFO scheduler.TaskSetManager: Starting task 0.0 in stage > 1.0 (TID 1, ip-172-31-41-210.ec2.internal, RACK_LOCAL, 1418 bytes) > 15/07/17 23:55:28 INFO scheduler.TaskSetManager: Starting task 1.0 in stage > 1.0 (TID 2, ip-172-31-36-179.ec2.internal, RACK_LOCAL, 1420 bytes) > 15/07/17 23:55:28 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in > memory on ip-172-31-36-179.ec2.internal:39998 (size: 3.7 KB, free: 535.0 MB) > 15/07/17 23:55:28 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in > memory on ip-172-31-41-210.ec2.internal:36847 (size: 3.7 KB, free: 535.0 MB) > 15/07/17 23:55:29 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in > memory on ip-172-31-41-210.ec2.internal:36847 (size: 18.8 KB, free: 535.0 MB) > 15/07/17 23:55:31 INFO scheduler.TaskSetManager: Starting task 2.0 in stage > 1.0 (TID 3, ip-172-31-41-210.ec2.internal, RACK_LOCAL, 1421 bytes) > 15/07/17 23:55:31 INFO scheduler.TaskSetManager: Finished task 0.0 in stage > 1.0 (TID 1) in 3501 ms on ip-172-31-41-210.ec2.internal (1/1354) > 15/07/17 23:55:31 INFO scheduler.TaskSetManager: Starting task 3.0 in stage > 1.0 (TID 4, ip-172-31-41-210.ec2.internal, RACK_LOCAL, 1420 bytes) > 15/07/17 23:55:31 INFO scheduler.TaskSetManager: Finished task 2.0 in stage > 1.0 (TID 3) in 99 ms on ip-172-31-41-210.ec2.internal (2/1354) > 15/07/17 23:55:33 INFO scheduler.TaskSetManager: Starting task 4.0 in stage > 1.0 (TID 5, ip-172-31-36-179.ec2.internal, RACK_LOCAL, 1420 bytes) > 15/07/17 23:55:33 INFO scheduler.TaskSetManager: Finished task 1.0 in stage > 1.0 (TID 2) in 5190 ms on ip-172-31-36-179.ec2.internal (3/1354) > 15/07/17 23:55:36 INFO scheduler.TaskSetManager: Starting task 5.0 in stage > 1.0 (TID 6, ip-172-31-41-210.ec2.internal, RACK_LOCAL, 1420 bytes) > 15/07/17 23:55:36 INFO scheduler.TaskSetManager: Finished task 3.0 in stage > 1.0 (TID 4) in 4471 ms on ip-172-31-41-210.ec2.internal (4/1354) > 15/07/17 23:55:37 INFO scheduler.TaskSetManager: Starting task 6.0 in stage > 1.0 (TID 7, ip-172-31-36-179.ec2.internal, RACK_LOCAL, 1420 bytes) > 15/07/17 23:55:37 INFO scheduler.TaskSetManager: Finished task 4.0 in stage > 1.0 (TID 5) in 3676 ms on ip-172-31-36-179.ec2.internal (5/1354) > 15/07/17 23:55:40 INFO scheduler.TaskSetManager: Starting task 7.0 in stage > 1.0 (TID 8, ip-172-31-41-210.ec2.internal, RACK_LOCAL, 1420 bytes) > 15/07/17 23:55:40 INFO scheduler.TaskSetManager: Finished task 5.0 in stage > 1.0 (TID 6) in 3895 ms on ip-172-31-41-210.ec2.internal (6/1354) > 15/07/17 23:55:40 INFO scheduler.TaskSetManager: Starting task 8.0 in stage > 1.0 (TID 9, ip-1 > > I also tried eliminating S3 by distcp'ing the S3 data first into HDFS in the > EMR cluster and then running a count() on that, but it doesn't make much > difference, there are still only 2 IPs processing, they initially start as > NODE_LOCAL but eventually switch to RACK_LOCAL. > > I'm at a loss at what I have misconfigured, any help would be appreciated. > > Thanks ! > > Charles
Re: Spark 1.5.1 standalone cluster - wrong Akka remoting config?
Try setting spark.driver.host to the actual ip or hostname of the box submitting the work. More info the networking section in this link: http://spark.apache.org/docs/latest/configuration.html Also check the spark config for your application for these driver settings in the application web UI at http://:4040 in the “Environment” tab. More info in the "viewing configuration properties" section in that link. M > On Oct 8, 2015, at 7:04 AM, baraky wrote: > > Doing my firsts steps with Spark, I'm facing problems submitting jobs to > cluster from the application code. Digging the logs, I noticed some periodic > WARN messages on master log: > > 15/10/08 13:00:00 WARN remote.ReliableDeliverySupervisor: Association with > remote system [akka.tcp://sparkDriver@192.168.254.167:64014] has failed, > address is now gated for [5000] ms. Reason: [Disassociated] > > The problem is that ip address not exist on our network, and wasn't > configured anywhere. The same wrong ip is shown on the worker log when it > tries execute the task (wrong ip passed to --driver-url): > > 15/10/08 12:58:21 INFO worker.ExecutorRunner: Launch command: > "/usr/java/latest//bin/java" "-cp" "/path/spark/spark-1.5.1-bin-ha > doop2.6/sbin/../conf/:/path/spark/spark-1.5.1-bin-hadoop2.6/lib/spark-assembly-1.5.1-hadoop2.6.0.jar:/path/spark/ > spark-1.5.1-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/path/spark/spark-1.5.1-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.ja > r:/path/spark/spark-1.5.1-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/path/hadoop/2.6.0//etc/hadoop/" > "-Xms102 > 4M" "-Xmx1024M" "-Dspark.driver.port=64014" "-Dspark.driver.port=53411" > "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" > "akka.tcp://sparkDriver@192.168.254.167:64014/user/CoarseGrainedScheduler" > "--executor-id" "39" "--hostname" "192.168.10.214" "--cores" "16" "--app-id" > "app-20151008123702-0003" "--worker-url" > "akka.tcp://sparkWorker@192.168.10.214:37625/user/Worker" > 15/10/08 12:59:28 INFO worker.Worker: Executor app-20151008123702-0003/39 > finished with state EXITED message Command exited with code 1 exitStatus 1 > Any idea what I did wrong and how can this be fixed? > > The java version is 1.8.0_20, and I'm using pre-built Spark binaries. > > Thanks! > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-5-1-standalone-cluster-wrong-Akka-remoting-config-tp24978.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org >
Re: Scalable JDBCRDD
Jorn: Vertica Cody: I posited the limit just as an example of how jdbcrdd could be used least invasively. Let's say we used a partition on a time field -- we would still need to have N executions of those queries. The queries we have are very intense and concurrency is an issue even if the the N partitioned queries are smaller. Some queries require evaluating the whole data set first. If our use case a simple select * from table.. Then the partitions would be an easier sell if it wasn't for the concurrency problem :) Long story short -- we need only one execution of the query and would like to just divy out the result set. M > On Mar 1, 2015, at 5:18 AM, Jörn Franke wrote: > > What database are you using? > > Le 28 févr. 2015 18:15, "Michal Klos" a écrit : >> Hi Spark community, >> >> We have a use case where we need to pull huge amounts of data from a SQL >> query against a database into Spark. We need to execute the query against >> our huge database and not a substitute (SparkSQL, Hive, etc) because of a >> couple of factors including custom functions used in the queries that only >> our database has. >> >> We started by looking at JDBC RDD, which utilizes a prepared statement with >> two parameters that are meant to be used to partition the result set to the >> workers... e.g.: >> >> select * from table limit ?,? >> >> turns into >> >> select * from table limit 1,100 on worker 1 >> select * from table limit 101,200 on worker 2 >> >> This will not work for us because our database cannot support multiple >> execution of these queries without being crippled. But, additionally, our >> database doesn't support the above LIMIT syntax and we don't have a generic >> way of partitioning the various queries. >> >> As a result -- we stated by forking JDBCRDD and made a version that executes >> the SQL query once in getPartitions into a Vector and then hands each worker >> node an index and iterator. Here's a snippet of getPartitions and compute: >> >> override def getPartitions: Array[Partition] = { >> //Compute the DB query once here >> val results = computeQuery >> >> (0 until numPartitions).map(i => { >> // TODO: would be better to do this partitioning when scrolling >> through result set if still loading into memory >> val partitionItems = results.drop(i).sliding(1, >> numPartitions).flatten.toVector >> new DBPartition(i, partitionItems) >> }).toArray >> } >> >> override def compute(thePart: Partition, context: TaskContext) = new >> NextIterator[T] { >> val part = thePart.asInstanceOf[DBPartition[T]] >> >> //Shift the result vector to our index number and then do a sliding >> iterator over it >> val iterator = part.items.iterator >> >> override def getNext : T = { >> if (iterator.hasNext) { >> iterator.next() >> } else { >> finished = true >> null.asInstanceOf[T] >> } >> } >> >> override def close: Unit = () >> } >> >> This is a little better since we can just execute the query once. However, >> the result-set needs to fit in memory. >> >> We've been trying to brainstorm a way to >> >> A) have that result set distribute out to the worker RDD partitions as it's >> streaming in from the cursor? >> B) have the result set spill to disk if it exceeds memory and do something >> clever around the iterators? >> C) something else? >> >> We're not familiar enough yet with all of the workings of Spark to know how >> to proceed on this. >> >> We also thought of the worker-around of having the DB query dump to HDFS/S3 >> and then pick it up for there, but it adds more moving parts and latency to >> our processing. >> >> Does anyone have a clever suggestion? Are we missing something? >> >> thanks, >> Michal
Re: Scalable JDBCRDD
Yes exactly. The temp table is an approach but then we need to manage the deletion of it etc. I'm sure we won't be the only people with this crazy use case. If there isn't a feasible way to do this "within the framework" then that's okay. But if there is a way we are happy to write the code and PR it back :) M > On Mar 1, 2015, at 10:02 AM, eric wrote: > > What you're saying is that, due to the intensity of the query, you need to > run a single query and partition the results, versus running one query for > each partition. > > I assume it's not viable to throw the query results into another table in > your database and then query that using the normal approach? > > --eric > >> On 3/1/15 4:28 AM, michal.klo...@gmail.com wrote: >> Jorn: Vertica >> >> Cody: I posited the limit just as an example of how jdbcrdd could be used >> least invasively. Let's say we used a partition on a time field -- we would >> still need to have N executions of those queries. The queries we have are >> very intense and concurrency is an issue even if the the N partitioned >> queries are smaller. Some queries require evaluating the whole data set >> first. If our use case a simple select * from table.. Then the partitions >> would be an easier sell if it wasn't for the concurrency problem :) Long >> story short -- we need only one execution of the query and would like to >> just divy out the result set. >> >> M >> >> >> >> On Mar 1, 2015, at 5:18 AM, Jörn Franke wrote: >> >>> What database are you using? >>> >>> Le 28 févr. 2015 18:15, "Michal Klos" a écrit : >>>> Hi Spark community, >>>> >>>> We have a use case where we need to pull huge amounts of data from a SQL >>>> query against a database into Spark. We need to execute the query against >>>> our huge database and not a substitute (SparkSQL, Hive, etc) because of a >>>> couple of factors including custom functions used in the queries that only >>>> our database has. >>>> >>>> We started by looking at JDBC RDD, which utilizes a prepared statement >>>> with two parameters that are meant to be used to partition the result set >>>> to the workers... e.g.: >>>> >>>> select * from table limit ?,? >>>> >>>> turns into >>>> >>>> select * from table limit 1,100 on worker 1 >>>> select * from table limit 101,200 on worker 2 >>>> >>>> This will not work for us because our database cannot support multiple >>>> execution of these queries without being crippled. But, additionally, our >>>> database doesn't support the above LIMIT syntax and we don't have a >>>> generic way of partitioning the various queries. >>>> >>>> As a result -- we stated by forking JDBCRDD and made a version that >>>> executes the SQL query once in getPartitions into a Vector and then hands >>>> each worker node an index and iterator. Here's a snippet of getPartitions >>>> and compute: >>>> >>>> override def getPartitions: Array[Partition] = { >>>> //Compute the DB query once here >>>> val results = computeQuery >>>> >>>> (0 until numPartitions).map(i => { >>>> // TODO: would be better to do this partitioning when scrolling >>>> through result set if still loading into memory >>>> val partitionItems = results.drop(i).sliding(1, >>>> numPartitions).flatten.toVector >>>> new DBPartition(i, partitionItems) >>>> }).toArray >>>> } >>>> >>>> override def compute(thePart: Partition, context: TaskContext) = new >>>> NextIterator[T] { >>>> val part = thePart.asInstanceOf[DBPartition[T]] >>>> >>>> //Shift the result vector to our index number and then do a sliding >>>> iterator over it >>>> val iterator = part.items.iterator >>>> >>>> override def getNext : T = { >>>> if (iterator.hasNext) { >>>> iterator.next() >>>> } else { >>>> finished = true >>>> null.asInstanceOf[T] >>>> } >>>> } >>>> >>>> override def close: Unit = () >>>> } >>>> >>>> This is a little better since we can just execute the query once. However, >>>> the result-set needs to fit in memory. >>>> We've been trying to brainstorm a way to >>>> A) have that result set distribute out to the worker RDD partitions as >>>> it's streaming in from the cursor? >>>> B) have the result set spill to disk if it exceeds memory and do something >>>> clever around the iterators? >>>> C) something else? >>>> We're not familiar enough yet with all of the workings of Spark to know >>>> how to proceed on this. >>>> We also thought of the worker-around of having the DB query dump to >>>> HDFS/S3 and then pick it up for there, but it adds more moving parts and >>>> latency to our processing. >>>> Does anyone have a clever suggestion? Are we missing something? >>>> thanks, >>>> Michal >
Re: Job submission API
A SparkContext can submit jobs remotely. The spark-submit options in general can be populated into a SparkConf and passed in when you create a SparkContext. We personally have not had too much success with yarn-client remote submission, but standalone cluster mode was easy to get going. M > On Apr 7, 2015, at 7:01 PM, Prashant Kommireddi wrote: > > Hello folks, > > Newbie here! Just had a quick question - is there a job submission API such > as the one with hadoop > https://hadoop.apache.org/docs/r2.3.0/api/org/apache/hadoop/mapreduce/Job.html#submit() > to submit Spark jobs to a Yarn cluster? I see in example that > bin/spark-submit is what's out there, but couldn't find any APIs around it. > > Thanks, > Prashant
Re: Querying Cluster State
Not sure if there's a spark native way but we've been using consul for this. M > On Apr 26, 2015, at 5:17 AM, James King wrote: > > Thanks for the response. > > But no this does not answer the question. > > The question was: Is there a way (via some API call) to query the number and > type of daemons currently running in the Spark cluster. > > Regards > > >> On Sun, Apr 26, 2015 at 10:12 AM, ayan guha wrote: >> In my limited understanding, there must be single "leader" master in the >> cluster. If there are multiple leaders, it will lead to unstable cluster as >> each masters will keep scheduling independently. You should use zookeeper >> for HA, so that standby masters can vote to find new leader if the primary >> goes down. >> >> Now, you can still have multiple masters running as leaders but conceptually >> they should be thought as different clusters. >> >> Regarding workers, they should follow their master. >> >> Not sure if this answers your question, as I am sure you have read the >> documentation thoroughly. >> >> Best >> Ayan >> >>> On Sun, Apr 26, 2015 at 6:31 PM, James King wrote: >>> If I have 5 nodes and I wish to maintain 1 Master and 2 Workers on each >>> node, so in total I will have 5 master and 10 Workers. >>> >>> Now to maintain that setup I would like to query spark regarding the number >>> Masters and Workers that are currently available using API calls and then >>> take some appropriate action based on the information I get back, like >>> restart a dead Master or Worker. >>> >>> Is this possible? does Spark provide such API? >> >> >> >> -- >> Best Regards, >> Ayan Guha >
Re: submitting to multiple masters
According to the docs it should go like this: spark://host1:port1,host2:port2 https://spark.apache.org/docs/latest/spark-standalone.html#standby-masters-with-zookeeper Thanks M > On Apr 28, 2015, at 8:13 AM, James King wrote: > > I have multiple masters running and I'm trying to submit an application using > > spark-1.3.0-bin-hadoop2.4/bin/spark-submit > > with this config (i.e. a comma separated list of master urls) > > --master spark://master01:7077,spark://master02:7077 > > > But getting this exception > > Exception in thread "main" org.apache.spark.SparkException: Invalid master > URL: spark://spark://master02:7077 > > > What am I doing wrong? > > Many Thanks > jk
Re: How to get Master UI with ZooKeeper HA setup?
I've been querying Zookeeper directly via the Zookeeper client tools, it has the ip of the current master leader in the master_status data. We are also running Exhibitor for zookeeper which has a nice UI for exploring if you want to look up manually Thanks, Michal > On May 12, 2015, at 1:28 AM, Rex Xiong wrote: > > Hi, > > We have a 3-node master setup with ZooKeeper HA. > Driver can find the master with spark://xxx:xxx,xxx:xxx,xxx:xxx > But how can I find out the valid Master UI without looping through all 3 > nodes? > > Thanks - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org