[ https://issues.apache.org/jira/browse/HIVE-18301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16336946#comment-16336946 ]
liyunzhang edited comment on HIVE-18301 at 1/24/18 5:39 AM: ------------------------------------------------------------ In HIVE-18301.patch, it provides one solution to transfer the {{IOContext::inputPath}} {code} inputRDD1 inputRDD2 |CopyFunction | CopyFunction CopyRDD1 CopyRDD2 | | MT_11 MT_12 | | RT_1 RT_2 \ / Union {code} MT_11 will call following stack to initialize IOContext::inputPath {code} CombineHiveRecordReader#init ->HiveContextAwareRecordReader.initIOContext ->IOContext.setInputPath {code} inputRDD1 and inputRDD2 are same table's rdd, so CopyRDD1 and CopyRDD2 are same rdd if rdd cache is enabled. When MT_12 will not call CombineHiveRecordReader#init to initialize {{IOContext::inputPath}} but {{MapOperator#process(Writable value)}} still need this value. IOContext is bound to single thread, so the value is different in different thread. {{inputRDD1-CopyRDD1-MT_11-RT_1}} and {{inputRDD2-CopyRDD2-MT_12-RT_2}} is called in different thread. So IOContext can not be shared between these two threads. For this issue, I gave following solution: We save the inputPath in CopyRDD1 when {{inputRDD1-CopyRDD1-MT_11-RT_1}} is executed. CopyRDD2 get the cached value and inputPath from CopyRDD1 which is stored in spark cache manager. We reinitialized the {{IOContext::inputPath}} in {{MapOperator#process(Writable value)}} in MT_12. *where to setInputPath?* MapInput#CopyFunction#call, save inputPath in the first element of returned tuple {code} public MapInput(SparkPlan sparkPlan, JavaPairRDD<WritableComparable, Writable> hadoopRDD) { this(sparkPlan, hadoopRDD, false); @@ -79,10 +83,19 @@ public void setToCache(boolean toCache) { call(Tuple2<WritableComparable, Writable> tuple) throws Exception { if (conf == null) { conf = new Configuration(); + conf.set("hive.execution.engine","spark"); } - - return new Tuple2<WritableComparable, Writable>(tuple._1(), - WritableUtils.clone(tuple._2(), conf)); + // CopyFunction MapFunction + // HADOOPRDD-----------------> RDD1-------------> RDD2..... + // these transformation are in one stage and will be executed by 1 spark task(thread), + // IOContext.get(conf).getInputPath will not be null. + String inputPath = IOContextMap.get(conf).getInputPath().toString(); + Text inputPathText = new Text(inputPath); + // save inputPath in the first element of returned tuple + return new Tuple2<WritableComparable, Writable>(inputPathText, + WritableUtils.clone(tuple._2(), conf)); } } {code} *where to getInputPath?* {code} SparkMapRecordHandler#getInputPath public void processRow(Object key, Object value) throws IOException { .... + if (HiveConf.getBoolVar(jc, HiveConf.ConfVars.HIVE_SPARK_SHARED_WORK_OPTIMIZATION)) { + Path inputPath = IOContextMap.get(jc).getInputPath(); + // when inputPath is null, it means the record is cached + if (inputPath == null) { + Text pathText = (Text) key; + IOContextMap.get(jc).setInputPath(new Path(pathText.toString())); + } + } .... {code} [~lirui], [~xuefuz], [~stakiar],[~csun], please give me your suggesions, thanks! was (Author: kellyzly): In HIVE-18301.patch, it provides one solution to transfer the {{IOContext::inputPath}} {code} inputRDD1 inputRDD2 |CopyFunction | CopyFunction CopyRDD1 CopyRDD2 | | MT_11 MT_12 | | RT_1 RT_2 \ / Union {code} MT_11 will call following stack to initialize IOContext::inputPath {code} CombineHiveRecordReader#init ->HiveContextAwareRecordReader.initIOContext ->IOContext.setInputPath {code} inputRDD1 and inputRDD2 are same table's rdd, so CopyRDD1 and CopyRDD2 are same rdd if rdd cache is enabled. When MT_12 will not call CombineHiveRecordReader#init to initialize {{IOContext::inputPath}} but {{MapOperator#process(Writable value)}} still need this value. IOContext is bound to single thread, so the value is different in different thread. {{inputRDD1-CopyRDD1-MT_11-RT_1}} and {{inputRDD2-CopyRDD2-MT_12-RT_2}} is called in different thread. So IOContext can not be shared between these two threads. For this issue, I gave following solution: We save the inputPath in CopyRDD1 when {{inputRDD1-CopyRDD1-MT_11-RT_1}} is executed. CopyRDD2 get the cached value and inputPath from CopyRDD1 which is stored in spark cache manager. We reinitialized the {{IOContext::inputPath}} in {{MapOperator#process(Writable value)}} in MT_12. *where to setInputPath?* MapInput#CopyFunction#call, save inputPath in the first element of returned tuple {code} public MapInput(SparkPlan sparkPlan, JavaPairRDD<WritableComparable, Writable> hadoopRDD) { this(sparkPlan, hadoopRDD, false); @@ -79,10 +83,19 @@ public void setToCache(boolean toCache) { call(Tuple2<WritableComparable, Writable> tuple) throws Exception { if (conf == null) { conf = new Configuration(); + conf.set("hive.execution.engine","spark"); } - - return new Tuple2<WritableComparable, Writable>(tuple._1(), - WritableUtils.clone(tuple._2(), conf)); + // CopyFunction MapFunction + // HADOOPRDD-----------------> RDD1-------------> RDD2..... + // these transformation are in one stage and will be executed by 1 spark task(thread), + // IOContext.get(conf).getInputPath will not be null. + String inputPath = IOContextMap.get(conf).getInputPath().toString(); + Text inputPathText = new Text(inputPath); + // save inputPath in the first element of returned tuple + return new Tuple2<WritableComparable, Writable>(inputPathText, + WritableUtils.clone(tuple._2(), conf)); } } {code} *where to getInputPath?* {code} SparkMapRecordHandler#getInputPath public void processRow(Object key, Object value) throws IOException { .... + if (HiveConf.getBoolVar(jc, HiveConf.ConfVars.HIVE_SPARK_SHARED_WORK_OPTIMIZATION)) { + Path inputPath = IOContextMap.get(jc).getInputPath(); + // when inputPath is null, it means the record is cached + if (inputPath == null) { + Text pathText = (Text) key; + IOContextMap.get(jc).setInputPath(new Path(pathText.toString())); + } + } .... {code} [~lirui], [~xuefuz], [~stakiar],[~csun], please give me your suggesions, thanks! > Investigate to enable MapInput cache in Hive on Spark > ----------------------------------------------------- > > Key: HIVE-18301 > URL: https://issues.apache.org/jira/browse/HIVE-18301 > Project: Hive > Issue Type: Bug > Reporter: liyunzhang > Assignee: liyunzhang > Priority: Major > Attachments: HIVE-18301.patch > > > Before IOContext problem is found in MapTran when spark rdd cache is enabled > in HIVE-8920. > so we disabled rdd cache in MapTran at > [SparkPlanGenerator|https://github.com/kellyzly/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java#L202]. > The problem is IOContext seems not initialized correctly in the spark yarn > client/cluster mode and caused the exception like > {code} > Job aborted due to stage failure: Task 93 in stage 0.0 failed 4 times, most > recent failure: Lost task 93.3 in stage 0.0 (TID 616, bdpe48): > java.lang.RuntimeException: Error processing row: > java.lang.NullPointerException > at > org.apache.hadoop.hive.ql.exec.spark.SparkMapRecordHandler.processRow(SparkMapRecordHandler.java:165) > at > org.apache.hadoop.hive.ql.exec.spark.HiveMapFunctionResultList.processNextRecord(HiveMapFunctionResultList.java:48) > at > org.apache.hadoop.hive.ql.exec.spark.HiveMapFunctionResultList.processNextRecord(HiveMapFunctionResultList.java:27) > at > org.apache.hadoop.hive.ql.exec.spark.HiveBaseFunctionResultList.hasNext(HiveBaseFunctionResultList.java:85) > at > scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42) > at > org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) > at org.apache.spark.scheduler.Task.run(Task.scala:85) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.NullPointerException > at > org.apache.hadoop.hive.ql.exec.AbstractMapOperator.getNominalPath(AbstractMapOperator.java:101) > at > org.apache.hadoop.hive.ql.exec.MapOperator.cleanUpInputFileChangedOp(MapOperator.java:516) > at > org.apache.hadoop.hive.ql.exec.Operator.cleanUpInputFileChanged(Operator.java:1187) > at > org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:546) > at > org.apache.hadoop.hive.ql.exec.spark.SparkMapRecordHandler.processRow(SparkMapRecordHandler.java:152) > ... 12 more > Driver stacktrace: > {code} > in yarn client/cluster mode, sometimes > [ExecMapperContext#currentInputPath|https://github.com/kellyzly/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java#L109] > is null when rdd cach is enabled. -- This message was sent by Atlassian JIRA (v7.6.3#76005)