Hi Richard, thanks much for the reply. If I dont create threads job runs too slow since I have thousand jobs or thousand hive partitions directory to process. hiveContext.sql(...) runs fine and creates output as I expected do I need to call any action method really? Job works fine as expected I am just hitting physical memory limit and YARN kills executor I saw it in YARN logs. I believe because of group by queries lots shuffle data moves around and creates mess. Please guide.
On Sun, Sep 13, 2015 at 2:09 AM, Richard W. Eggert II < richard.egg...@gmail.com> wrote: > Without a stack trace, I can't say for certain what is causing your > OutOfMemoryError, but I do see a number of problems with your code. > > First of all, given that Spark is a parallel processing framework, it is > almost never necessary to manually create a thread pool within the driver. > You should instead chain together some RDDs and let Spark parallelize the > work on the cluster for you. > > Secondly, unless I'm mistaken, SQLContext.sql (which HiveContext inherits) > does not actually execute your SQL query. It just creates a DataFrame that > represents the query. You have to invoke one of DataFrame's "action" > methods (such as count, collect, foreach, or saveAsTextFile) to cause Spark > to create a job to actually execute the query. The documentation is > admittedly a bit vague and misleading about this, however. > > Rich > > On September 12, 2015, at 3:52 PM, unk1102 <umesh.ka...@gmail.com> wrote: > > Hi I have the following Spark driver program/job which reads ORC files > (i.e. > hive partitions as HDFS directories) process them in DataFrame and use them > as table in hiveContext.sql(). Job runs fine it gives correct results but > it > hits physical memory limit after one hour or so and YARN kills executor and > things gets slower and slower. Please see the following code and help me > identify problem. I created 20 Threads from driver program and spawn them. > Thread logic contains lambda function which gets executed on executors. > Please guide I am new to Spark. Thanks much. > > public class DataSpark { > > public static final Map<String,String> dMap = new > LinkedHashMap<>(); > > public static final String[] colNameArr = new String[] > {"_col0","col2","bla bla 45 columns"}; > > public static void main(String[] args) throws Exception { > > > Set<DataWorker> workers = new HashSet<>(); > > SparkConf sparkConf = new SparkConf().setAppName("SparkTest"); > setSparkConfProperties(sparkConf); > SparkContext sc = new SparkContext(sparkConf); > final FileSystem fs = FileSystem.get(sc.hadoopConfiguration()); > HiveContext hiveContext = createHiveContext(sc); > > declareHiveUDFs(hiveContext); > > DateTimeFormatter df = DateTimeFormat.forPattern("yyyyMMdd"); > String yestday = "20150912"; > hiveContext.sql(" use xyz "); > createTables(hiveContext); > DataFrame partitionFrame = hiveContext.sql(" show partitions > data partition(date=\""+ yestday + "\")"); > > //add csv files to distributed cache > Row[] rowArr = partitionFrame.collect(); > for(Row row : rowArr) { > String[] splitArr = row.getString(0).split("/"); > String entity = splitArr[0].split("=")[1]; > int date = Integer.parseInt(splitArr[1].split("=")[1]); > > String sourcePath = > "/user/xyz/Hive/xyz.db/data/entity="+entity+"/date="+date; > Path spath = new Path(sourcePath); > if(fs.getContentSummary(spath).getFileCount() > 0) { > DataWorker worker = new DataWorker(hiveContext,entity, > date); > workers.add(worker); > } > } > > ExecutorService executorService = > Executors.newFixedThreadPool(20); > executorService.invokeAll(workers); > executorService.shutdown(); > > > sc.stop(); > } > > private static void setSparkConfProperties(SparkConf sparkConf) { > sparkConf.set("spark.rdd.compress","true"); > > sparkConf.set("spark.shuffle.consolidateFiles","true"); > > sparkConf.set("spark.executor.logs.rolling.maxRetainedFiles","90"); > sparkConf.set("spark.executor.logs.rolling.strategy","time"); > sparkConf.set("spark.serializer", > "org.apache.spark.serializer.KryoSerializer"); > sparkConf.set("spark.shuffle.manager","tungsten-sort"); > > sparkConf.set("spark.shuffle.memoryFraction","0.5"); > sparkConf.set("spark.storage.memoryFraction","0.2"); > > } > > private static HiveContext createHiveContext(SparkContext sc) { > HiveContext hiveContext = new HiveContext(sc); > hiveContext.setConf("spark.sql.codgen","true"); > hiveContext.setConf("spark.sql.unsafe.enabled","true"); > > hiveContext.setConf("spark.sql.shuffle.partitions","15");//need > to set this to avoid large no of small files by default spark creates 200 > output part files > hiveContext.setConf("spark.sql.orc.filterPushdown","true"); > return hiveContext; > } > > private static void declareHiveUDFs(HiveContext hiveContext) { > hiveContext.sql("CREATE TEMPORARY FUNCTION UDF1 AS > 'com.blab.blab.UDF1'"); > hiveContext.sql("CREATE TEMPORARY FUNCTION UDF2 AS > 'com.blab.blab.UDF2'"); > } > > private static void createTables(HiveContext hiveContext) { > > hiveContext.sql(" create table if not exists abc blab bla ); > > hiveContext.sql(" create table if not exists def blab bla ); > > } > > > > private static void createBaseTableAfterProcessing(HiveContext > hiveContext,String entity,int date) { > String sourcePath = > "/user/xyz/Hive/xyz.db/data/entity="+entity+"/date="+date; > > DataFrame sourceFrame = > hiveContext.read().format("orc").load(sourcePath); > > //rename fields from _col* to actual column names > DataFrame renamedSourceFrame = sourceFrame.toDF(colNameArr); > //filter data from fields > DataFrame dFrame = > renamedSourceFrame.filter(renamedSourceFrame.col("col1").contains("ABC"). > > or(renamedSourceFrame.col("col1").contains("col3"))).orderBy("col2", > "col3", > "col4"); > > DataFrame dRemovedFrame = renamedSourceFrame.except(dslFrame); > > JavaRDD<Row> dRemovedRDD = dsqlRemovedFrame.toJavaRDD(); > JavaRDD<Row> sourceRdd = dFrame.toJavaRDD(); > > JavaRDD<Row> indexedRdd = sourceRdd.mapPartitionsWithIndex(new > Function2<Integer, Iterator<Row>, Iterator<Row>>() { > @Override > public Iterator<Row> call(Integer ind, Iterator<Row> > rowIterator) throws Exception { > List<Row> rowList = new ArrayList<>(); > > while (rowIterator.hasNext()) { > Row row = rowIterator.next(); > List rowAsList = > iterate(JavaConversions.seqAsJavaList(row.toSeq())); > Row updatedRow = > RowFactory.create(rowAsList.toArray()); > rowList.add(updatedRow); > } > return rowList.iterator(); > } > > },false).union(dRemovedRDD).persist(StorageLevels.MEM_DISK_SER); > DataFrame dUpdatedDataFrame = > hiveContext.createDataFrame(indexedRdd,renamedSourceFrame.schema()); > hiveContext.registerDataFrameAsTable(dUpdatedDataFrame, > basetable); > > } > > > private static void createXView(HiveContext hiveContext,String > entity, int date) { > hiveContext.sql("insert into table def partition(entity='" + entity + > "',date=" + date + ") from baseTable group by bla bla" ); > } > > private static void createYView(HiveContext hiveContext,String > entity, int date) { > > hiveContext.sql("insert into table abc partition(entity='" + entity + > "',date=" + date + ") from baseTable group by bla bla" ); > > } > > > private static final List iterate(List row) { > > List rowAsList = null; > String request = null; > > rowAsList = new ArrayList<>(row); > request = row.get(2).toString(); > int n = Integer.parseInt(row.get(10).toString()); > if (n == 0) > dMap.clear(); > if (request != null && request.contains("ABC")) { > String key = request.substring(request.indexOf(":") + > 2, > request.indexOf("*", request.indexOf(":")) - 1); > if (request.contains("ABC DEF")) { > dMap.put(key, > request.substring(request.indexOf("as") + 3)); > request = request.replaceAll(key, ""); > } else if (request.contains("ABC GHI")) { > if (dMap.containsKey(key)) { > request = request.replaceAll(key, > dMap.get(key)); > } > } else if (request.contains("ABC IJK")) { > if (dMap.containsKey(key)) { > request = request.replaceAll(key, > dMap.get(key)); > dMap.remove(key); > } > } > } > rowAsList.set(2, request); > } > > return rowAsList; > } > > public static class DataWorker implements Callable<Void> { > > private String entity; > private int date; > private HiveContext hiveContext; > > public DataWorker(HiveContext hiveContext,String entity,int > date) { > this.hiveContext = hiveContext; > this.entity = entity; > this.date = date; > } > > @Override > public Void call() throws Exception { > createBaseTableAfterProcessing(this.hiveContext, entity, > date); > createXView(this.hiveContext, entity, date); > createYView(this.hiveContext, entity, date); > > this.hiveContext.clearCache(); > return null; > } > } > > } > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Why-my-Spark-job-is-slow-and-it-throws-OOM-which-leads-YARN-killing-executors-tp24671.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >