Can you can tell us what exactly you are trying to achieve?

Thanks
Best Regards

On Mon, May 25, 2015 at 5:00 PM, <luohui20...@sina.com> wrote:

> thanks, madhu and Akhil
>
> I modified my code like below,however I think it is not so distributed.
> Have you guys better idea to run this app more efficiantly and distributed?
>
> So I add some comments with my understanding:
>
> import org.apache.spark._
> import www.celloud.com.model._
>
> object GeneCompare3 {
>   def main(args: Array[String]) {
>     val conf = new
> SparkConf().setAppName("GeneCompare").setMaster("spark://master:7077").set("spark.executor.memory",
> "6g").set("hive.metastore.warehouse.dir", "/user/hive/warehouse")
>     val sc = new SparkContext(conf)
>     val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>     import sqlContext.implicits._
>
>     val db = sc.textFile("/data/db.txt").map(_.split("\t")).map(x =>
> Database(x(0),
>       x(1).trim().toInt,
>       x(2).trim().toInt,
>       x(3).trim().toInt,
>       x(4).trim().toInt)).toDF()
>     val sample = sc.textFile("/data/sample.txt").map(_.split("\t")).map(x
> => Sample(x(0),
>       x(1).trim().toInt,
>       x(2),
>       x(3).trim().toInt,
>       x(4).trim().toInt,
>       x(5))).toDF()
>
>       //using Akhil Das's idea,1to21 is a file with 21lines,each line is a
> single number
> //    val test = sc.textfile("1to21.txt").foreach{i
> =>                                       //running on driver manager
> //      db.filter("chrname = 'chr" + i +
> "'").rdd.saveAsTextFile("/data/chr" + i)                //running on driver
> executor
> //      db.rdd.pipe("hadoop fs -getmerge /data/chr" + i + "
> /opt/data/shellcompare/chr" + i + ".txt")
> //      sample.filter("name = 'chr" + i +
> "'").rdd.saveAsTextFile("/data/samplechr" + i)
> //      db.rdd.pipe("hadoop fs -getmerge /data/samplechr" + i + "
> /opt/data/shellcompare/samplechr" + i + ".txt")   //running on driver
> executor
> //    }.collect()              //running on driver manager
>
>       //using madhu's method
>
>       //running on driver manager
>
>     for (i <- 1 to 21) {
>       db.filter("chrname = 'chr" + i + "'").rdd.saveAsTextFile("/data/chr"
> + i)
>       db.rdd.pipe("hadoop fs -getmerge /data/chr" + i + "
> /opt/data/shellcompare/chr" + i + ".txt").collect()
>       sample.filter("name = 'chr" + i +
> "'").rdd.saveAsTextFile("/data/samplechr" + i)
>       db.rdd.pipe("hadoop fs -getmerge /data/samplechr" + i + "
> /opt/data/shellcompare/samplechr" + i + ".txt").collect()
>     }
>
>
>
>     //running on driver manager
>
>     val runmodifyshell=List("run","modify.sh")
>     val runmodifyshellRDD = sc.makeRDD(runmodifyshell)
>     val pipeModify = runmodifyshellRDD.pipe("sh
> /opt/data/shellcompare/modify.sh")
>
>     pipeModify.collect()
>
>
>
>     //running on driver manager
>     val shellcompare = List("run","sort.sh")
>     val shellcompareRDD = sc.makeRDD(shellcompare)
>     val result = List("aggregate","result")
>     val resultRDD = sc.makeRDD(result)
>     for(j <- 1 to 21){
>       shellcompareRDD.pipe("sh /opt/sh/bin/sort.sh
> /opt/data/shellcompare/chr" + j + ".txt /opt/data/shellcompare/samplechr" +
> j + ".txt /opt/data/shellcompare/result" + j + ".txt 600").collect()
>       if (j > 1) resultRDD.pipe("cat result" + j + ".txt >>
> result1.txt").collect()
>     }
>   }
> }
>
>
> And I know there are some problems in my code, such as "modify.sh" will
> not be executed.
>
> --------------------------------
>
> Thanks&amp;Best regards!
> San.Luo
>
> ----- 原始邮件 -----
> 发件人:madhu phatak <phatak....@gmail.com>
> 收件人:luohui20...@sina.com
> 抄送人:Akhil Das <ak...@sigmoidanalytics.com>, user <user@spark.apache.org>
> 主题:Re: Re: how to distributed run a bash shell in spark
> 日期:2015年05月25日 14点11分
>
> Hi,
> You can use pipe operator, if you are running shell script/perl script on
> some data. More information on my blog
> <http://blog.madhukaraphatak.com/pipe-in-spark/>.
>
>
>
>
> Regards,
> Madhukara Phatak
> http://datamantra.io/
>
> On Mon, May 25, 2015 at 8:02 AM, <luohui20...@sina.com> wrote:
>
> Thanks Akhil,
>
>            your code is a big help to me,'cause perl script is the exactly
> thing i wanna try to run in spark. I will have a try.
>
>
> --------------------------------
>
> Thanks&amp;Best regards!
> San.Luo
>
> ----- 原始邮件 -----
> 发件人:Akhil Das <ak...@sigmoidanalytics.com>
> 收件人:罗辉 <luohui20...@sina.com>
> 抄送人:user <user@spark.apache.org>
> 主题:Re: how to distributed run a bash shell in spark
> 日期:2015年05月25日 00点53分
>
> You mean you want to execute some shell commands from spark? Here's
> something i tried a while back. https://github.com/akhld/spark-exploit
>
> Thanks
> Best Regards
>
> On Sun, May 24, 2015 at 4:53 PM, <luohui20...@sina.com> wrote:
>
> hello there
>
>       I am trying to run a app in which part of it needs to run a
> shell.how to run a shell distributed in spark cluster.thanks.
>
>
> here's my code:
>
> import java.io.IOException;
> import java.util.ArrayList;
> import java.util.List;
>
> import org.apache.spark.SparkConf;
> import org.apache.spark.api.java.JavaRDD;
> import org.apache.spark.api.java.JavaSparkContext;
> import org.apache.spark.sql.DataFrame;
> import org.apache.spark.sql.Row;
> import org.apache.spark.sql.SQLContext;
> import org.apache.spark.sql.types.DataType;
> import org.apache.spark.sql.types.StructField;
> import org.apache.spark.sql.types.StructType;
>
> public class ShellCompare {
>     public static void main(String[] args) {
>         // TODO Auto-generated method stub
>         SparkConf conf = new
> SparkConf().setAppName("ShellCompare").setMaster("spark://master:7077").set("spark.executor.memory",
> "6g");
>         JavaSparkContext sc = new JavaSparkContext(conf);
>
>         for(int i=1;i<=21;i++){
>             execShell(i);
>         }
> //        execShell(1);
>         sc.stop();
>     }
>
>     private static void execShell(int i) {
>         String shpath="/opt/sh/bin/sort.sh";
>         Process process =null;
>
>         String var="/opt/data/shellcompare/chr" + i +".txt
> /opt/data/shellcompare/samplechr" + i +".txt
> /opt/data/shellcompare/result.txt 600";
> //        String var="/opt/data/chr1.txt /opt/data/chr1sample.txt
> /opt/sh/bin/result.txt 600";
>         String command2 = "sh " + shpath + " " + var;
>         try {
>             process = Runtime.getRuntime().exec(command2);
>             process.waitFor();
>         } catch (InterruptedException e1) {
>             // TODO Auto-generated catch block
>             e1.printStackTrace();
>         } catch (IOException e) {
>             // TODO Auto-generated catch block
>             e.printStackTrace();
>         }
>     }
>
> }
>
>
>
> --------------------------------
>
> Thanks&amp;Best regards!
> San.Luo
>
>
>
>

Reply via email to