Hi Richard, thanks much for the reply. If I dont create threads job runs
too slow since I have thousand jobs or thousand hive partitions directory
to process. hiveContext.sql(...) runs fine and creates output as I expected
do I need to call any action method really? Job works fine as expected I am
just hitting physical memory limit and YARN kills executor I saw it in YARN
logs. I believe because of group by queries lots shuffle data moves around
and creates mess. Please guide.

On Sun, Sep 13, 2015 at 2:09 AM, Richard W. Eggert II <
richard.egg...@gmail.com> wrote:

> Without a stack trace, I can't say for certain what is causing your
> OutOfMemoryError, but I do see a number of problems with your code.
>
> First of all, given that Spark is a parallel processing framework, it is
> almost never necessary to manually create a thread pool within the driver.
> You should instead chain together some RDDs and let Spark parallelize the
> work on the cluster for you.
>
> Secondly, unless I'm mistaken, SQLContext.sql (which HiveContext inherits)
> does not actually execute your SQL query. It just creates a DataFrame that
> represents the query. You have to invoke one of DataFrame's "action"
> methods (such as count, collect, foreach, or saveAsTextFile) to cause Spark
> to create a job to actually execute the query. The documentation is
> admittedly a bit vague and misleading about this, however.
>
> Rich
>
> On September 12, 2015, at 3:52 PM, unk1102 <umesh.ka...@gmail.com> wrote:
>
> Hi I have the following Spark driver program/job which reads ORC files
> (i.e.
> hive partitions as HDFS directories) process them in DataFrame and use them
> as table in hiveContext.sql(). Job runs fine it gives correct results but
> it
> hits physical memory limit after one hour or so and YARN kills executor and
> things gets slower and slower. Please see the following code and help me
> identify problem. I created 20 Threads from driver program and spawn them.
> Thread logic contains lambda function which gets executed on executors.
> Please guide I am new to Spark. Thanks much.
>
>   public class DataSpark {
>
>         public static final Map<String,String> dMap = new
> LinkedHashMap<>();
>
>         public static final String[] colNameArr = new String[]
> {"_col0","col2","bla bla 45 columns"};
>
>     public static void main(String[] args) throws Exception {
>
>
>             Set<DataWorker> workers = new HashSet<>();
>
>             SparkConf sparkConf = new SparkConf().setAppName("SparkTest");
>             setSparkConfProperties(sparkConf);
>             SparkContext sc = new SparkContext(sparkConf);
>             final FileSystem fs = FileSystem.get(sc.hadoopConfiguration());
>             HiveContext hiveContext = createHiveContext(sc);
>
>             declareHiveUDFs(hiveContext);
>
>             DateTimeFormatter df = DateTimeFormat.forPattern("yyyyMMdd");
>             String yestday = "20150912";
>             hiveContext.sql(" use xyz ");
>             createTables(hiveContext);
>             DataFrame partitionFrame = hiveContext.sql(" show partitions
> data partition(date=\""+ yestday + "\")");
>
>             //add csv files to distributed cache
>             Row[] rowArr = partitionFrame.collect();
>             for(Row row : rowArr) {
>                 String[] splitArr = row.getString(0).split("/");
>                 String entity = splitArr[0].split("=")[1];
>                 int date =  Integer.parseInt(splitArr[1].split("=")[1]);
>
>                 String sourcePath =
> "/user/xyz/Hive/xyz.db/data/entity="+entity+"/date="+date;
>                 Path spath = new Path(sourcePath);
>                 if(fs.getContentSummary(spath).getFileCount() > 0) {
>                     DataWorker worker = new DataWorker(hiveContext,entity,
> date);
>                     workers.add(worker);
>                 }
>             }
>
>             ExecutorService executorService =
> Executors.newFixedThreadPool(20);
>             executorService.invokeAll(workers);
>             executorService.shutdown();
>
>
>             sc.stop();
>         }
>
>         private static void setSparkConfProperties(SparkConf sparkConf) {
>             sparkConf.set("spark.rdd.compress","true");
>
>             sparkConf.set("spark.shuffle.consolidateFiles","true");
>
> sparkConf.set("spark.executor.logs.rolling.maxRetainedFiles","90");
>             sparkConf.set("spark.executor.logs.rolling.strategy","time");
>             sparkConf.set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer");
>             sparkConf.set("spark.shuffle.manager","tungsten-sort");
>
>            sparkConf.set("spark.shuffle.memoryFraction","0.5");
>            sparkConf.set("spark.storage.memoryFraction","0.2");
>
>         }
>
>         private static HiveContext createHiveContext(SparkContext sc) {
>             HiveContext hiveContext = new HiveContext(sc);
>             hiveContext.setConf("spark.sql.codgen","true");
>             hiveContext.setConf("spark.sql.unsafe.enabled","true");
>
>             hiveContext.setConf("spark.sql.shuffle.partitions","15");//need
> to set this to avoid large no of small files by default spark creates 200
> output part files
>             hiveContext.setConf("spark.sql.orc.filterPushdown","true");
>             return hiveContext;
>         }
>
>         private static void declareHiveUDFs(HiveContext hiveContext) {
>             hiveContext.sql("CREATE TEMPORARY FUNCTION UDF1 AS
> 'com.blab.blab.UDF1'");
>             hiveContext.sql("CREATE TEMPORARY FUNCTION UDF2 AS
> 'com.blab.blab.UDF2'");
>         }
>
>         private static void createTables(HiveContext hiveContext) {
>
>             hiveContext.sql(" create table if not exists abc blab bla );
>
>              hiveContext.sql(" create table if not exists def blab bla );
>
>         }
>
>
>
>         private static void createBaseTableAfterProcessing(HiveContext
> hiveContext,String entity,int date) {
>             String sourcePath =
> "/user/xyz/Hive/xyz.db/data/entity="+entity+"/date="+date;
>
>             DataFrame sourceFrame =
> hiveContext.read().format("orc").load(sourcePath);
>
>             //rename fields from _col* to actual column names
>             DataFrame renamedSourceFrame = sourceFrame.toDF(colNameArr);
>             //filter data from fields
>             DataFrame dFrame =
> renamedSourceFrame.filter(renamedSourceFrame.col("col1").contains("ABC").
>
> or(renamedSourceFrame.col("col1").contains("col3"))).orderBy("col2",
> "col3",
> "col4");
>
>  DataFrame dRemovedFrame = renamedSourceFrame.except(dslFrame);
>
>             JavaRDD<Row> dRemovedRDD = dsqlRemovedFrame.toJavaRDD();
>             JavaRDD<Row> sourceRdd = dFrame.toJavaRDD();
>
>             JavaRDD<Row> indexedRdd = sourceRdd.mapPartitionsWithIndex(new
> Function2<Integer, Iterator&lt;Row>, Iterator<Row>>() {
>                 @Override
>                 public Iterator<Row> call(Integer ind, Iterator<Row>
> rowIterator) throws Exception {
>                     List<Row> rowList = new ArrayList<>();
>
>                     while (rowIterator.hasNext()) {
>                         Row row = rowIterator.next();
>                         List rowAsList =
> iterate(JavaConversions.seqAsJavaList(row.toSeq()));
>                         Row updatedRow =
> RowFactory.create(rowAsList.toArray());
>                         rowList.add(updatedRow);
>                     }
>                     return rowList.iterator();
>                 }
>
> },false).union(dRemovedRDD).persist(StorageLevels.MEM_DISK_SER);
>              DataFrame dUpdatedDataFrame =
> hiveContext.createDataFrame(indexedRdd,renamedSourceFrame.schema());
>             hiveContext.registerDataFrameAsTable(dUpdatedDataFrame,
> basetable);
>
>         }
>
>
>         private static void createXView(HiveContext hiveContext,String
> entity, int date) {
>     hiveContext.sql("insert into table def partition(entity='" + entity +
> "',date=" + date + ") from baseTable group by bla bla" );
>         }
>
>         private static void createYView(HiveContext hiveContext,String
> entity, int date) {
>
>     hiveContext.sql("insert into table abc partition(entity='" + entity +
> "',date=" + date + ") from baseTable group by bla bla" );
>
>         }
>
>
>         private static final List iterate(List row) {
>
>             List rowAsList = null;
>             String request = null;
>
>                 rowAsList = new ArrayList<>(row);
>                 request = row.get(2).toString();
>                 int n = Integer.parseInt(row.get(10).toString());
>                 if (n == 0)
>                     dMap.clear();
>                 if (request != null && request.contains("ABC")) {
>                     String key = request.substring(request.indexOf(":") +
> 2,
> request.indexOf("*", request.indexOf(":")) - 1);
>                     if (request.contains("ABC DEF")) {
>                         dMap.put(key,
> request.substring(request.indexOf("as") + 3));
>                         request = request.replaceAll(key, "");
>                     } else if (request.contains("ABC GHI")) {
>                         if (dMap.containsKey(key)) {
>                             request = request.replaceAll(key,
> dMap.get(key));
>                         }
>                     } else if (request.contains("ABC IJK")) {
>                         if (dMap.containsKey(key)) {
>                             request = request.replaceAll(key,
> dMap.get(key));
>                             dMap.remove(key);
>                         }
>                     }
>                 }
>                 rowAsList.set(2, request);
>             }
>
>             return rowAsList;
>         }
>
>         public static class DataWorker implements Callable<Void> {
>
>             private String entity;
>             private int date;
>             private HiveContext hiveContext;
>
>             public DataWorker(HiveContext hiveContext,String entity,int
> date) {
>                 this.hiveContext = hiveContext;
>                 this.entity = entity;
>                 this.date = date;
>             }
>
>             @Override
>             public Void call() throws Exception {
>                 createBaseTableAfterProcessing(this.hiveContext, entity,
> date);
>                 createXView(this.hiveContext, entity, date);
>                 createYView(this.hiveContext, entity, date);
>
>                 this.hiveContext.clearCache();
>                 return null;
>             }
>         }
>
>     }
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Why-my-Spark-job-is-slow-and-it-throws-OOM-which-leads-YARN-killing-executors-tp24671.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

Reply via email to