If you open up the driver UI (running on 4040), you can see multiple tasks per stage which will be happening concurrently. If it is a single task, and you want to increase the parallelism, then you can simply do a re-partition.
Thanks Best Regards On Tue, May 26, 2015 at 8:27 AM, <luohui20...@sina.com> wrote: > I am right trying to run some shell script in my spark app, hoping it runs > more concurrently in my spark cluster. > > However I am not sure whether my codes will run concurrently in my > executors. > > Dive into my code, you can see that I am trying to > > 1.splite both db and sample into 21 small files. That will generate total > 42 files. By spliting db I will get "chr1" ,"chr2",..."chr21", and spliting > sample I will get "samplechr1","samplechr2",..."samplechr21". > > 2.merge those splited files from hdfs and save to local path.Those merged > file will be "chr1.txt" ,"chr2.txt",..."chr21.txt" and > "samplechr1.txt","samplechr2.txt",..."samplechr21.txt" > > 3.run modify.sh to clean the data, that means to delete some charactors > not useful. > > 4.run shellcompare.sh to compare chr1.txt and samplechr1.txt, get a > result1.txt. And looping it from 1 to 21 so that those 42 file are compared > and I can get 21 files like result1.txt,result2.txt...result21.txt. > > Sorry for not adding some comments for my code. > > > -------------------------------- > > Thanks&Best regards! > San.Luo > > ----- 原始邮件 ----- > 发件人:Akhil Das <ak...@sigmoidanalytics.com> > 收件人:罗辉 <luohui20...@sina.com> > 抄送人:user <user@spark.apache.org> > 主题:Re: Re: Re: how to distributed run a bash shell in spark > 日期:2015年05月25日 22点41分 > > Can you can tell us what exactly you are trying to achieve? > > Thanks > Best Regards > > On Mon, May 25, 2015 at 5:00 PM, <luohui20...@sina.com> wrote: > > thanks, madhu and Akhil > > I modified my code like below,however I think it is not so distributed. > Have you guys better idea to run this app more efficiantly and distributed? > > So I add some comments with my understanding: > > import org.apache.spark._ > import www.celloud.com.model._ > > object GeneCompare3 { > def main(args: Array[String]) { > val conf = new > SparkConf().setAppName("GeneCompare").setMaster("spark://master:7077").set("spark.executor.memory", > "6g").set("hive.metastore.warehouse.dir", "/user/hive/warehouse") > val sc = new SparkContext(conf) > val sqlContext = new org.apache.spark.sql.SQLContext(sc) > import sqlContext.implicits._ > > val db = sc.textFile("/data/db.txt").map(_.split("\t")).map(x => > Database(x(0), > x(1).trim().toInt, > x(2).trim().toInt, > x(3).trim().toInt, > x(4).trim().toInt)).toDF() > val sample = sc.textFile("/data/sample.txt").map(_.split("\t")).map(x > => Sample(x(0), > x(1).trim().toInt, > x(2), > x(3).trim().toInt, > x(4).trim().toInt, > x(5))).toDF() > > //using Akhil Das's idea,1to21 is a file with 21lines,each line is a > single number > // val test = sc.textfile("1to21.txt").foreach{i > => //running on driver manager > // db.filter("chrname = 'chr" + i + > "'").rdd.saveAsTextFile("/data/chr" + i) //running on driver > executor > // db.rdd.pipe("hadoop fs -getmerge /data/chr" + i + " > /opt/data/shellcompare/chr" + i + ".txt") > // sample.filter("name = 'chr" + i + > "'").rdd.saveAsTextFile("/data/samplechr" + i) > // db.rdd.pipe("hadoop fs -getmerge /data/samplechr" + i + " > /opt/data/shellcompare/samplechr" + i + ".txt") //running on driver > executor > // }.collect() //running on driver manager > > //using madhu's method > > //running on driver manager > > for (i <- 1 to 21) { > db.filter("chrname = 'chr" + i + "'").rdd.saveAsTextFile("/data/chr" > + i) > db.rdd.pipe("hadoop fs -getmerge /data/chr" + i + " > /opt/data/shellcompare/chr" + i + ".txt").collect() > sample.filter("name = 'chr" + i + > "'").rdd.saveAsTextFile("/data/samplechr" + i) > db.rdd.pipe("hadoop fs -getmerge /data/samplechr" + i + " > /opt/data/shellcompare/samplechr" + i + ".txt").collect() > } > > > > //running on driver manager > > val runmodifyshell=List("run","modify.sh") > val runmodifyshellRDD = sc.makeRDD(runmodifyshell) > val pipeModify = runmodifyshellRDD.pipe("sh > /opt/data/shellcompare/modify.sh") > > pipeModify.collect() > > > > //running on driver manager > val shellcompare = List("run","sort.sh") > val shellcompareRDD = sc.makeRDD(shellcompare) > val result = List("aggregate","result") > val resultRDD = sc.makeRDD(result) > for(j <- 1 to 21){ > shellcompareRDD.pipe("sh /opt/sh/bin/sort.sh > /opt/data/shellcompare/chr" + j + ".txt /opt/data/shellcompare/samplechr" + > j + ".txt /opt/data/shellcompare/result" + j + ".txt 600").collect() > if (j > 1) resultRDD.pipe("cat result" + j + ".txt >> > result1.txt").collect() > } > } > } > > > And I know there are some problems in my code, such as "modify.sh" will > not be executed. > > -------------------------------- > > Thanks&Best regards! > San.Luo > > ----- 原始邮件 ----- > 发件人:madhu phatak <phatak....@gmail.com> > 收件人:luohui20...@sina.com > 抄送人:Akhil Das <ak...@sigmoidanalytics.com>, user <user@spark.apache.org> > 主题:Re: Re: how to distributed run a bash shell in spark > 日期:2015年05月25日 14点11分 > > Hi, > You can use pipe operator, if you are running shell script/perl script on > some data. More information on my blog > <http://blog.madhukaraphatak.com/pipe-in-spark/>. > > > > > Regards, > Madhukara Phatak > http://datamantra.io/ > > On Mon, May 25, 2015 at 8:02 AM, <luohui20...@sina.com> wrote: > > Thanks Akhil, > > your code is a big help to me,'cause perl script is the exactly > thing i wanna try to run in spark. I will have a try. > > > -------------------------------- > > Thanks&Best regards! > San.Luo > > ----- 原始邮件 ----- > 发件人:Akhil Das <ak...@sigmoidanalytics.com> > 收件人:罗辉 <luohui20...@sina.com> > 抄送人:user <user@spark.apache.org> > 主题:Re: how to distributed run a bash shell in spark > 日期:2015年05月25日 00点53分 > > You mean you want to execute some shell commands from spark? Here's > something i tried a while back. https://github.com/akhld/spark-exploit > > Thanks > Best Regards > > On Sun, May 24, 2015 at 4:53 PM, <luohui20...@sina.com> wrote: > > hello there > > I am trying to run a app in which part of it needs to run a > shell.how to run a shell distributed in spark cluster.thanks. > > > here's my code: > > import java.io.IOException; > import java.util.ArrayList; > import java.util.List; > > import org.apache.spark.SparkConf; > import org.apache.spark.api.java.JavaRDD; > import org.apache.spark.api.java.JavaSparkContext; > import org.apache.spark.sql.DataFrame; > import org.apache.spark.sql.Row; > import org.apache.spark.sql.SQLContext; > import org.apache.spark.sql.types.DataType; > import org.apache.spark.sql.types.StructField; > import org.apache.spark.sql.types.StructType; > > public class ShellCompare { > public static void main(String[] args) { > // TODO Auto-generated method stub > SparkConf conf = new > SparkConf().setAppName("ShellCompare").setMaster("spark://master:7077").set("spark.executor.memory", > "6g"); > JavaSparkContext sc = new JavaSparkContext(conf); > > for(int i=1;i<=21;i++){ > execShell(i); > } > // execShell(1); > sc.stop(); > } > > private static void execShell(int i) { > String shpath="/opt/sh/bin/sort.sh"; > Process process =null; > > String var="/opt/data/shellcompare/chr" + i +".txt > /opt/data/shellcompare/samplechr" + i +".txt > /opt/data/shellcompare/result.txt 600"; > // String var="/opt/data/chr1.txt /opt/data/chr1sample.txt > /opt/sh/bin/result.txt 600"; > String command2 = "sh " + shpath + " " + var; > try { > process = Runtime.getRuntime().exec(command2); > process.waitFor(); > } catch (InterruptedException e1) { > // TODO Auto-generated catch block > e1.printStackTrace(); > } catch (IOException e) { > // TODO Auto-generated catch block > e.printStackTrace(); > } > } > > } > > > > -------------------------------- > > Thanks&Best regards! > San.Luo > > > > >