Hi Ricky,

In your first try, you are using flatMap. It will give you a flat list of
strings. Then you are trying to map each string to a Row, which definitely
throws an exception.

Following Terry's idea, you are mapping the input to a list of arrays, each
of which contains some strings. Then you try to map each array of strings
to an Row. I guess the problem comes from some array doesn't have the
expected length. You can probably check the length of array. You can also
merge two mapping operations.

Thanks
Ai

On Fri, Aug 28, 2015 at 4:19 AM, our...@cnsuning.com <our...@cnsuning.com>
wrote:

> Terry:
>     Unfortunately, error remains when use your advice.But error is changed
> ,now error is java.lang.ArrayIndexOutOfBoundsException: 71
>   error log as following:
>
>
> 15/08/28 19:13:53 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks
> have all completed, from pool
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
> in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage
> 0.0 (TID 9, 10.104.74.7): java.lang.ArrayIndexOutOfBoundsException: 71
> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:23)
> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:23)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at
> org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$6.apply(Aggregate.scala:130)
>
> at
> org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$6.apply(Aggregate.scala:126)
>
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
>
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
>
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:70)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>
> at java.lang.Thread.run(Thread.java:745)
>
> Driver stacktrace:
> at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
>
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
>
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
>
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
>
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
>
> at scala.Option.foreach(Option.scala:236)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
>
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
>
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
>
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>
>
>
>
>
>
>
>
>
>
> *From:* Terry Hole <hujie.ea...@gmail.com>
> *Date:* 2015-08-28 17:22
> *To:* our...@cnsuning.com
> *CC:* user <user@spark.apache.org>; hao.cheng <hao.ch...@intel.com>; Huang,
> Jie <jie.hu...@intel.com>
> *Subject:* Re: Job aborted due to stage failure:
> java.lang.StringIndexOutOfBoundsException: String index out of range: 18
> Ricky,
>
>
> You may need to use map instead of flatMap in your case
>
> *val rowRDD=sc.textFile("/user/spark/short_model").map(_.split("\\t")).map(p 
> => Row(...))*
>
>
> Thanks!
>
> -Terry
>
>
> On Fri, Aug 28, 2015 at 5:08 PM, our...@cnsuning.com <our...@cnsuning.com>
> wrote:
>
>> hi all,
>>
>> when using  spark sql ,A problem bothering me.
>>
>> the codeing as following:
>>
>>      *val schemaString =
>> "visitor_id,cust_num,gds_id,l1_gds_group_cd,l4_gds_group_cd,pc_gds_addcart,pc_gds_collect_num,pc_gds_four_page_pv,pc_gds_four_page_time,pc_gds_four_page_fromsearch_pv,pc_gds_four_page_fromlist_pv,pc_gds_four_page_fromrec_pv,pc_gds_four_page_fromcuxiao_pv,pc_four_page_num,pc_group_gds_addcart,pc_group_gds_collect,pc_group_fourpage_pv,pc_group_fourpage_time,pc_visitor_pv,pc_search_pv,pc_list_pv,pc_is_view1,pc_is_view,pc_view_cycle_days,pc_view_days,wap_gds_addcart,wap_gds_collect_num,wap_gds_four_page_pv,wap_gds_four_page_time,wap_gds_four_page_fromsearch_pv,wap_gds_four_page_fromlist_pv,wap_gds_four_page_fromrec_pv,wap_gds_four_page_fromcuxiao_pv,wap_four_page_num,wap_group_gds_addcart,wap_group_gds_collect,wap_group_fourpage_pv,wap_group_fourpage_time,wap_visitor_pv,wap_search_pv,wap_list_pv,wap_is_view1,wap_is_view,wap_view_cycle_days,wap_view_days,app_gds_addcart,app_gds_collect_num,app_gds_four_page_pv,app_gds_four_page_time,app_gds_four_page_fromsearch_pv,app_gds_four_page_fromlist_pv,app_gds_four_page_fromrec_pv,app_gds_four_page_fromcuxiao_pv,app_four_page_num,app_group_gds_addcart,app_group_gds_collect,app_group_fourpage_pv,app_group_fourpage_time,app_visitor_pv,app_search_pv,app_list_pv,app_is_view1,app_is_view,app_view_cycle_days,app_view_days,gds_score_desc,l4_gds_group_rate_n,decision_cycle_days,decision_days,decision_pv,is_order,statis_date"*
>> //*schemaString.length=72 *
>>
>> *import org.apache.spark.sql.Row;*
>>
>> *import org.apache.spark.sql.types.{StructType,StructField,StringType};*
>>
>> *val schema =StructType( schemaString.split(",").map(fieldName => 
>> StructField(fieldName, StringType, true)))*
>>
>> *val 
>> rowRDD=sc.textFile("/user/spark/short_model").flatMap(_.split("\\t")).map(p 
>> => 
>> Row(p(0),p(1),p(2),p(3),p(4),p(5),p(6),p(7),p(8),p(9),p(10),p(11),p(12),p(13),p(14),p(15),p(16),p(17),p(18),p(19),p(20),p(21),p(22),p(23),p(24),p(25),p(26),p(27),p(28),p(29),p(30),p(31),p(32),p(33),p(34),p(35),p(36),p(37),p(38),p(39),p(40),p(41),p(42),p(43),p(44),p(45),p(46),p(47),p(48),p(49),p(50),p(51),p(52),p(53),p(54),p(55),p(56),p(57),p(58),p(59),p(60),p(61),p(62),p(63),p(64),p(65),p(66),p(67),p(68),p(69),p(70),p(71)))*
>>
>> *val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)*
>>
>> *peopleDataFrame.registerTempTable("alg")*
>>
>> *val results = sqlContext.sql("SELECT count(*) FROM alg")*
>>
>> *results.collect()*
>>
>>
>> the error log as following:
>>
>>               5/08/28 17:00:54 WARN TaskSetManager: Lost task 56.0 in
>> stage 9.0 (TID 71, 10.104.74.8): java.lang.StringIndexOutOfBoundsException:
>> String index out of range: 18
>> at java.lang.String.charAt(String.java:658)
>> at
>> scala.collection.immutable.StringOps$.apply$extension(StringOps.scala:39)
>> at
>> $line415.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:26)
>> at
>> $line415.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:26)
>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>> at
>> org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$6.apply(Aggregate.scala:130)
>> at
>> org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$6.apply(Aggregate.scala:126)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>> at org.apache.spark.scheduler.Task.run(Task.scala:70)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> 15/08/28 17:00:54 INFO TaskSetManager: Starting task 56.1 in stage 9.0
>> (TID 72, 10.104.74.8, NODE_LOCAL, 1415 bytes)
>> 15/08/28 17:00:54 INFO TaskSetManager: Lost task 56.1 in stage 9.0 (TID
>> 72) on executor 10.104.74.8: java.lang.StringIndexOutOfBoundsException
>> (String index out of range: 18) [duplicate 1]
>> 15/08/28 17:00:54 INFO TaskSetManager: Starting task 56.2 in stage 9.0
>> (TID 73, 10.104.74.8, NODE_LOCAL, 1415 bytes)
>> 15/08/28 17:00:54 INFO TaskSetManager: Lost task 45.0 in stage 9.0 (TID
>> 70) on executor 10.104.74.6: java.lang.StringIndexOutOfBoundsException
>> (String index out of range: 18) [duplicate 2]
>> 15/08/28 17:00:54 INFO TaskSetManager: Starting task 45.1 in stage 9.0
>> (TID 74, 10.104.74.6, NODE_LOCAL, 1415 bytes)
>> 15/08/28 17:00:54 INFO TaskSetManager: Lost task 56.2 in stage 9.0 (TID
>> 73) on executor 10.104.74.8: java.lang.StringIndexOutOfBoundsException
>> (String index out of range: 18) [duplicate 3]
>> 15/08/28 17:00:54 INFO TaskSetManager: Starting task 56.3 in stage 9.0
>> (TID 75, 10.104.74.8, NODE_LOCAL, 1415 bytes)
>> 15/08/28 17:00:54 INFO TaskSetManager: Lost task 56.3 in stage 9.0 (TID
>> 75) on executor 10.104.74.8: java.lang.StringIndexOutOfBoundsException
>> (String index out of range: 18) [duplicate 4]
>> 15/08/28 17:00:54 ERROR TaskSetManager: Task 56 in stage 9.0 failed 4
>> times; aborting job
>> 15/08/28 17:00:54 INFO TaskSetManager: Lost task 45.1 in stage 9.0 (TID
>> 74) on executor 10.104.74.6: java.lang.StringIndexOutOfBoundsException
>> (String index out of range: 18) [duplicate 5]
>> 15/08/28 17:00:54 INFO TaskSchedulerImpl: Removed TaskSet 9.0, whose
>> tasks have all completed, from pool
>> 15/08/28 17:00:54 INFO TaskSchedulerImpl: Cancelling stage 9
>> 15/08/28 17:00:54 INFO DAGScheduler: ShuffleMapStage 9 (collect at
>> <console>:31) failed in 0.206 s
>> 15/08/28 17:00:54 INFO DAGScheduler: Job 6 failed: collect at
>> <console>:31, took 0.293903 s
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>> 56 in stage 9.0 failed 4 times, most recent failure: Lost task 56.3 in
>> stage 9.0 (TID 75, 10.104.74.8): java.lang.StringIndexOutOfBoundsException:
>> String index out of range: 18
>> at java.lang.String.charAt(String.java:658)
>> at
>> scala.collection.immutable.StringOps$.apply$extension(StringOps.scala:39)
>> at
>> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:26)
>> at
>> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:26)
>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>> at
>> org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$6.apply(Aggregate.scala:130)
>> at
>> org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$6.apply(Aggregate.scala:126)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>> at org.apache.spark.scheduler.Task.run(Task.scala:70)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> Driver stacktrace:
>> at org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at
>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
>> at scala.Option.foreach(Option.scala:236)
>> at
>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
>> at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
>> at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
>> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>
>>
>>
>>
>>
>>
>>
>> ------------------------------
>> Ricky  Ou(欧   锐)
>>
>> 部   门:苏宁云商 IT总部技术支撑研发中心大 数据中心数据平台开发部
>>
>> email  : our...@cnsuning.com <14070...@cnsuning.com>
>>
>>
>


-- 
Best
Ai

Reply via email to