thanks, madhu and Akhil
I modified my code like below,however I think it is not so distributed. Have
you guys better idea to run this app more efficiantly and distributed?
So I add some comments with my understanding:
import org.apache.spark._
import www.celloud.com.model._
object GeneCompare3 {
def main(args: Array[String]) {
val conf = new
SparkConf().setAppName("GeneCompare").setMaster("spark://master:7077").set("spark.executor.memory",
"6g").set("hive.metastore.warehouse.dir", "/user/hive/warehouse")
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val db = sc.textFile("/data/db.txt").map(_.split("\t")).map(x =>
Database(x(0),
x(1).trim().toInt,
x(2).trim().toInt,
x(3).trim().toInt,
x(4).trim().toInt)).toDF()
val sample = sc.textFile("/data/sample.txt").map(_.split("\t")).map(x =>
Sample(x(0),
x(1).trim().toInt,
x(2),
x(3).trim().toInt,
x(4).trim().toInt,
x(5))).toDF()
//using Akhil Das's idea,1to21 is a file with 21lines,each line is a
single number
// val test = sc.textfile("1to21.txt").foreach{i =>
//running on driver manager
// db.filter("chrname = 'chr" + i + "'").rdd.saveAsTextFile("/data/chr" +
i) //running on driver executor
// db.rdd.pipe("hadoop fs -getmerge /data/chr" + i + "
/opt/data/shellcompare/chr" + i + ".txt")
// sample.filter("name = 'chr" + i +
"'").rdd.saveAsTextFile("/data/samplechr" + i)
// db.rdd.pipe("hadoop fs -getmerge /data/samplechr" + i + "
/opt/data/shellcompare/samplechr" + i + ".txt") //running on driver executor
// }.collect() //running on driver manager
//using madhu's method //running on driver manager for (i <- 1 to
21) {
db.filter("chrname = 'chr" + i + "'").rdd.saveAsTextFile("/data/chr" + i)
db.rdd.pipe("hadoop fs -getmerge /data/chr" + i + "
/opt/data/shellcompare/chr" + i + ".txt").collect()
sample.filter("name = 'chr" + i +
"'").rdd.saveAsTextFile("/data/samplechr" + i)
db.rdd.pipe("hadoop fs -getmerge /data/samplechr" + i + "
/opt/data/shellcompare/samplechr" + i + ".txt").collect()
}
//running on driver manager
val runmodifyshell=List("run","modify.sh")
val runmodifyshellRDD = sc.makeRDD(runmodifyshell)
val pipeModify = runmodifyshellRDD.pipe("sh
/opt/data/shellcompare/modify.sh")
pipeModify.collect()
//running on driver manager
val shellcompare = List("run","sort.sh")
val shellcompareRDD = sc.makeRDD(shellcompare)
val result = List("aggregate","result")
val resultRDD = sc.makeRDD(result)
for(j <- 1 to 21){
shellcompareRDD.pipe("sh /opt/sh/bin/sort.sh /opt/data/shellcompare/chr"
+ j + ".txt /opt/data/shellcompare/samplechr" + j + ".txt
/opt/data/shellcompare/result" + j + ".txt 600").collect()
if (j > 1) resultRDD.pipe("cat result" + j + ".txt >>
result1.txt").collect()
}
}
}
And I know there are some problems in my code, such as "modify.sh" will not be
executed.
--------------------------------
Thanks&Best regards!
San.Luo
----- 原始邮件 -----
发件人:madhu phatak <[email protected]>
收件人:[email protected]
抄送人:Akhil Das <[email protected]>, user <[email protected]>
主题:Re: Re: how to distributed run a bash shell in spark
日期:2015年05月25日 14点11分
Hi,You can use pipe operator, if you are running shell script/perl script on
some data. More information on my blog.
Regards,
Madhukara Phatak
http://datamantra.io/
On Mon, May 25, 2015 at 8:02 AM, <[email protected]> wrote:
Thanks Akhil,
your code is a big help to me,'cause perl script is the exactly
thing i wanna try to run in spark. I will have a try.
--------------------------------
Thanks&Best regards!
San.Luo
----- 原始邮件 -----
发件人:Akhil Das <[email protected]>
收件人:罗辉 <[email protected]>
抄送人:user <[email protected]>
主题:Re: how to distributed run a bash shell in spark
日期:2015年05月25日 00点53分
You mean you want to execute some shell commands from spark? Here's something i
tried a while back. https://github.com/akhld/spark-exploitThanksBest Regards
On Sun, May 24, 2015 at 4:53 PM, <[email protected]> wrote:
hello there I am trying to run a app in which part of it needs to run a
shell.how to run a shell distributed in spark cluster.thanks.
here's my code:import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
public class ShellCompare {
public static void main(String[] args) {
// TODO Auto-generated method stub
SparkConf conf = new
SparkConf().setAppName("ShellCompare").setMaster("spark://master:7077").set("spark.executor.memory",
"6g");
JavaSparkContext sc = new JavaSparkContext(conf);
for(int i=1;i<=21;i++){
execShell(i);
}
// execShell(1);
sc.stop();
}
private static void execShell(int i) {
String shpath="/opt/sh/bin/sort.sh";
Process process =null;
String var="/opt/data/shellcompare/chr" + i +".txt
/opt/data/shellcompare/samplechr" + i +".txt /opt/data/shellcompare/result.txt
600";
// String var="/opt/data/chr1.txt /opt/data/chr1sample.txt
/opt/sh/bin/result.txt 600";
String command2 = "sh " + shpath + " " + var;
try {
process = Runtime.getRuntime().exec(command2);
process.waitFor();
} catch (InterruptedException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
--------------------------------
Thanks&Best regards!
San.Luo