Hi, can you try something like:
val rowRDD=sc.textFile("/user/spark/short_model").map{ line => val p = line.split("\\t<file:///\\t>") if (p.length >=72) { Row(p(0), p(1)…) } else { throw new RuntimeException(s“failed in parsing $line”) } } From the log “java.lang.ArrayIndexOutOfBoundsException: 71”, seems something wrong with your data, is that your intention? Thanks, Hao From: our...@cnsuning.com [mailto:our...@cnsuning.com] Sent: Friday, August 28, 2015 7:20 PM To: Terry Hole Cc: user Subject: Re: Re: Job aborted due to stage failure: java.lang.StringIndexOutOfBoundsException: String index out of range: 18 Terry: Unfortunately, error remains when use your advice.But error is changed ,now error is java.lang.ArrayIndexOutOfBoundsException: 71 error log as following: 15/08/28 19:13:53 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 9, 10.104.74.7): java.lang.ArrayIndexOutOfBoundsException: 71 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:23) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:23) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$6.apply(Aggregate.scala:130) at org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$6.apply(Aggregate.scala:126) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) From: Terry Hole<mailto:hujie.ea...@gmail.com> Date: 2015-08-28 17:22 To: our...@cnsuning.com<mailto:our...@cnsuning.com> CC: user<mailto:user@spark.apache.org>; hao.cheng<mailto:hao.ch...@intel.com>; Huang, Jie<mailto:jie.hu...@intel.com> Subject: Re: Job aborted due to stage failure: java.lang.StringIndexOutOfBoundsException: String index out of range: 18 Ricky, You may need to use map instead of flatMap in your case val rowRDD=sc.textFile("/user/spark/short_model").map(_.split("\\t<file:///\\t>")).map(p => Row(...)) Thanks! -Terry On Fri, Aug 28, 2015 at 5:08 PM, our...@cnsuning.com<mailto:our...@cnsuning.com> <our...@cnsuning.com<mailto:our...@cnsuning.com>> wrote: hi all, when using spark sql ,A problem bothering me. the codeing as following: val schemaString = "visitor_id,cust_num,gds_id,l1_gds_group_cd,l4_gds_group_cd,pc_gds_addcart,pc_gds_collect_num,pc_gds_four_page_pv,pc_gds_four_page_time,pc_gds_four_page_fromsearch_pv,pc_gds_four_page_fromlist_pv,pc_gds_four_page_fromrec_pv,pc_gds_four_page_fromcuxiao_pv,pc_four_page_num,pc_group_gds_addcart,pc_group_gds_collect,pc_group_fourpage_pv,pc_group_fourpage_time,pc_visitor_pv,pc_search_pv,pc_list_pv,pc_is_view1,pc_is_view,pc_view_cycle_days,pc_view_days,wap_gds_addcart,wap_gds_collect_num,wap_gds_four_page_pv,wap_gds_four_page_time,wap_gds_four_page_fromsearch_pv,wap_gds_four_page_fromlist_pv,wap_gds_four_page_fromrec_pv,wap_gds_four_page_fromcuxiao_pv,wap_four_page_num,wap_group_gds_addcart,wap_group_gds_collect,wap_group_fourpage_pv,wap_group_fourpage_time,wap_visitor_pv,wap_search_pv,wap_list_pv,wap_is_view1,wap_is_view,wap_view_cycle_days,wap_view_days,app_gds_addcart,app_gds_collect_num,app_gds_four_page_pv,app_gds_four_page_time,app_gds_four_page_fromsearch_pv,app_gds_four_page_fromlist_pv,app_gds_four_page_fromrec_pv,app_gds_four_page_fromcuxiao_pv,app_four_page_num,app_group_gds_addcart,app_group_gds_collect,app_group_fourpage_pv,app_group_fourpage_time,app_visitor_pv,app_search_pv,app_list_pv,app_is_view1,app_is_view,app_view_cycle_days,app_view_days,gds_score_desc,l4_gds_group_rate_n,decision_cycle_days,decision_days,decision_pv,is_order,statis_date" //schemaString.length=72 import org.apache.spark.sql.Row; import org.apache.spark.sql.types.{StructType,StructField,StringType}; val schema =StructType( schemaString.split(",").map(fieldName => StructField(fieldName, StringType, true))) val rowRDD=sc.textFile("/user/spark/short_model").flatMap(_.split("\\t<file:///\\t>")).map(p => Row(p(0),p(1),p(2),p(3),p(4),p(5),p(6),p(7),p(8),p(9),p(10),p(11),p(12),p(13),p(14),p(15),p(16),p(17),p(18),p(19),p(20),p(21),p(22),p(23),p(24),p(25),p(26),p(27),p(28),p(29),p(30),p(31),p(32),p(33),p(34),p(35),p(36),p(37),p(38),p(39),p(40),p(41),p(42),p(43),p(44),p(45),p(46),p(47),p(48),p(49),p(50),p(51),p(52),p(53),p(54),p(55),p(56),p(57),p(58),p(59),p(60),p(61),p(62),p(63),p(64),p(65),p(66),p(67),p(68),p(69),p(70),p(71))) val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema) peopleDataFrame.registerTempTable("alg") val results = sqlContext.sql("SELECT count(*) FROM alg") results.collect() the error log as following: 5/08/28 17:00:54 WARN TaskSetManager: Lost task 56.0 in stage 9.0 (TID 71, 10.104.74.8): java.lang.StringIndexOutOfBoundsException: String index out of range: 18 at java.lang.String.charAt(String.java:658) at scala.collection.immutable.StringOps$.apply$extension(StringOps.scala:39) at $line415.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:26) at $line415.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:26) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$6.apply(Aggregate.scala:130) at org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$6.apply(Aggregate.scala:126) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/08/28 17:00:54 INFO TaskSetManager: Starting task 56.1 in stage 9.0 (TID 72, 10.104.74.8, NODE_LOCAL, 1415 bytes) 15/08/28 17:00:54 INFO TaskSetManager: Lost task 56.1 in stage 9.0 (TID 72) on executor 10.104.74.8<http://10.104.74.8>: java.lang.StringIndexOutOfBoundsException (String index out of range: 18) [duplicate 1] 15/08/28 17:00:54 INFO TaskSetManager: Starting task 56.2 in stage 9.0 (TID 73, 10.104.74.8, NODE_LOCAL, 1415 bytes) 15/08/28 17:00:54 INFO TaskSetManager: Lost task 45.0 in stage 9.0 (TID 70) on executor 10.104.74.6<http://10.104.74.6>: java.lang.StringIndexOutOfBoundsException (String index out of range: 18) [duplicate 2] 15/08/28 17:00:54 INFO TaskSetManager: Starting task 45.1 in stage 9.0 (TID 74, 10.104.74.6, NODE_LOCAL, 1415 bytes) 15/08/28 17:00:54 INFO TaskSetManager: Lost task 56.2 in stage 9.0 (TID 73) on executor 10.104.74.8<http://10.104.74.8>: java.lang.StringIndexOutOfBoundsException (String index out of range: 18) [duplicate 3] 15/08/28 17:00:54 INFO TaskSetManager: Starting task 56.3 in stage 9.0 (TID 75, 10.104.74.8, NODE_LOCAL, 1415 bytes) 15/08/28 17:00:54 INFO TaskSetManager: Lost task 56.3 in stage 9.0 (TID 75) on executor 10.104.74.8<http://10.104.74.8>: java.lang.StringIndexOutOfBoundsException (String index out of range: 18) [duplicate 4] 15/08/28 17:00:54 ERROR TaskSetManager: Task 56 in stage 9.0 failed 4 times; aborting job 15/08/28 17:00:54 INFO TaskSetManager: Lost task 45.1 in stage 9.0 (TID 74) on executor 10.104.74.6<http://10.104.74.6>: java.lang.StringIndexOutOfBoundsException (String index out of range: 18) [duplicate 5] 15/08/28 17:00:54 INFO TaskSchedulerImpl: Removed TaskSet 9.0, whose tasks have all completed, from pool 15/08/28 17:00:54 INFO TaskSchedulerImpl: Cancelling stage 9 15/08/28 17:00:54 INFO DAGScheduler: ShuffleMapStage 9 (collect at <console>:31) failed in 0.206 s 15/08/28 17:00:54 INFO DAGScheduler: Job 6 failed: collect at <console>:31, took 0.293903 s org.apache.spark.SparkException: Job aborted due to stage failure: Task 56 in stage 9.0 failed 4 times, most recent failure: Lost task 56.3 in stage 9.0 (TID 75, 10.104.74.8): java.lang.StringIndexOutOfBoundsException: String index out of range: 18 at java.lang.String.charAt(String.java:658) at scala.collection.immutable.StringOps$.apply$extension(StringOps.scala:39) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:26) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:26) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$6.apply(Aggregate.scala:130) at org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$6.apply(Aggregate.scala:126) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org<http://org.apache.spark.scheduler.DAGScheduler.org>$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) ________________________________ Ricky Ou(欧 锐) 部 门:苏宁云商 IT总部技术支撑研发中心大 数据中心数据平台开发部 email : our...@cnsuning.com<mailto:14070...@cnsuning.com>