Hi, can you try something like:
val rowRDD=sc.textFile("/user/spark/short_model").map{ line =>
val p = line.split("\\t<file:///\\t>")
if (p.length >=72) {
Row(p(0), p(1)…)
} else {
throw new RuntimeException(s“failed in parsing $line”)
}
}
From the log “java.lang.ArrayIndexOutOfBoundsException: 71”, seems something
wrong with your data, is that your intention?
Thanks,
Hao
From: [email protected] [mailto:[email protected]]
Sent: Friday, August 28, 2015 7:20 PM
To: Terry Hole
Cc: user
Subject: Re: Re: Job aborted due to stage failure:
java.lang.StringIndexOutOfBoundsException: String index out of range: 18
Terry:
Unfortunately, error remains when use your advice.But error is changed ,now
error is java.lang.ArrayIndexOutOfBoundsException: 71
error log as following:
15/08/28 19:13:53 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have
all completed, from pool
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in
stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID
9, 10.104.74.7): java.lang.ArrayIndexOutOfBoundsException: 71
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:23)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:23)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at
org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$6.apply(Aggregate.scala:130)
at
org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$6.apply(Aggregate.scala:126)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
From: Terry Hole<mailto:[email protected]>
Date: 2015-08-28 17:22
To: [email protected]<mailto:[email protected]>
CC: user<mailto:[email protected]>; hao.cheng<mailto:[email protected]>;
Huang, Jie<mailto:[email protected]>
Subject: Re: Job aborted due to stage failure:
java.lang.StringIndexOutOfBoundsException: String index out of range: 18
Ricky,
You may need to use map instead of flatMap in your case
val
rowRDD=sc.textFile("/user/spark/short_model").map(_.split("\\t<file:///\\t>")).map(p
=> Row(...))
Thanks!
-Terry
On Fri, Aug 28, 2015 at 5:08 PM,
[email protected]<mailto:[email protected]>
<[email protected]<mailto:[email protected]>> wrote:
hi all,
when using spark sql ,A problem bothering me.
the codeing as following:
val schemaString =
"visitor_id,cust_num,gds_id,l1_gds_group_cd,l4_gds_group_cd,pc_gds_addcart,pc_gds_collect_num,pc_gds_four_page_pv,pc_gds_four_page_time,pc_gds_four_page_fromsearch_pv,pc_gds_four_page_fromlist_pv,pc_gds_four_page_fromrec_pv,pc_gds_four_page_fromcuxiao_pv,pc_four_page_num,pc_group_gds_addcart,pc_group_gds_collect,pc_group_fourpage_pv,pc_group_fourpage_time,pc_visitor_pv,pc_search_pv,pc_list_pv,pc_is_view1,pc_is_view,pc_view_cycle_days,pc_view_days,wap_gds_addcart,wap_gds_collect_num,wap_gds_four_page_pv,wap_gds_four_page_time,wap_gds_four_page_fromsearch_pv,wap_gds_four_page_fromlist_pv,wap_gds_four_page_fromrec_pv,wap_gds_four_page_fromcuxiao_pv,wap_four_page_num,wap_group_gds_addcart,wap_group_gds_collect,wap_group_fourpage_pv,wap_group_fourpage_time,wap_visitor_pv,wap_search_pv,wap_list_pv,wap_is_view1,wap_is_view,wap_view_cycle_days,wap_view_days,app_gds_addcart,app_gds_collect_num,app_gds_four_page_pv,app_gds_four_page_time,app_gds_four_page_fromsearch_pv,app_gds_four_page_fromlist_pv,app_gds_four_page_fromrec_pv,app_gds_four_page_fromcuxiao_pv,app_four_page_num,app_group_gds_addcart,app_group_gds_collect,app_group_fourpage_pv,app_group_fourpage_time,app_visitor_pv,app_search_pv,app_list_pv,app_is_view1,app_is_view,app_view_cycle_days,app_view_days,gds_score_desc,l4_gds_group_rate_n,decision_cycle_days,decision_days,decision_pv,is_order,statis_date"
//schemaString.length=72
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.{StructType,StructField,StringType};
val schema =StructType( schemaString.split(",").map(fieldName =>
StructField(fieldName, StringType, true)))
val
rowRDD=sc.textFile("/user/spark/short_model").flatMap(_.split("\\t<file:///\\t>")).map(p
=>
Row(p(0),p(1),p(2),p(3),p(4),p(5),p(6),p(7),p(8),p(9),p(10),p(11),p(12),p(13),p(14),p(15),p(16),p(17),p(18),p(19),p(20),p(21),p(22),p(23),p(24),p(25),p(26),p(27),p(28),p(29),p(30),p(31),p(32),p(33),p(34),p(35),p(36),p(37),p(38),p(39),p(40),p(41),p(42),p(43),p(44),p(45),p(46),p(47),p(48),p(49),p(50),p(51),p(52),p(53),p(54),p(55),p(56),p(57),p(58),p(59),p(60),p(61),p(62),p(63),p(64),p(65),p(66),p(67),p(68),p(69),p(70),p(71)))
val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)
peopleDataFrame.registerTempTable("alg")
val results = sqlContext.sql("SELECT count(*) FROM alg")
results.collect()
the error log as following:
5/08/28 17:00:54 WARN TaskSetManager: Lost task 56.0 in stage 9.0
(TID 71, 10.104.74.8): java.lang.StringIndexOutOfBoundsException: String index
out of range: 18
at java.lang.String.charAt(String.java:658)
at scala.collection.immutable.StringOps$.apply$extension(StringOps.scala:39)
at
$line415.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:26)
at
$line415.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:26)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at
org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$6.apply(Aggregate.scala:130)
at
org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$6.apply(Aggregate.scala:126)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/08/28 17:00:54 INFO TaskSetManager: Starting task 56.1 in stage 9.0 (TID 72,
10.104.74.8, NODE_LOCAL, 1415 bytes)
15/08/28 17:00:54 INFO TaskSetManager: Lost task 56.1 in stage 9.0 (TID 72) on
executor 10.104.74.8<http://10.104.74.8>:
java.lang.StringIndexOutOfBoundsException (String index out of range: 18)
[duplicate 1]
15/08/28 17:00:54 INFO TaskSetManager: Starting task 56.2 in stage 9.0 (TID 73,
10.104.74.8, NODE_LOCAL, 1415 bytes)
15/08/28 17:00:54 INFO TaskSetManager: Lost task 45.0 in stage 9.0 (TID 70) on
executor 10.104.74.6<http://10.104.74.6>:
java.lang.StringIndexOutOfBoundsException (String index out of range: 18)
[duplicate 2]
15/08/28 17:00:54 INFO TaskSetManager: Starting task 45.1 in stage 9.0 (TID 74,
10.104.74.6, NODE_LOCAL, 1415 bytes)
15/08/28 17:00:54 INFO TaskSetManager: Lost task 56.2 in stage 9.0 (TID 73) on
executor 10.104.74.8<http://10.104.74.8>:
java.lang.StringIndexOutOfBoundsException (String index out of range: 18)
[duplicate 3]
15/08/28 17:00:54 INFO TaskSetManager: Starting task 56.3 in stage 9.0 (TID 75,
10.104.74.8, NODE_LOCAL, 1415 bytes)
15/08/28 17:00:54 INFO TaskSetManager: Lost task 56.3 in stage 9.0 (TID 75) on
executor 10.104.74.8<http://10.104.74.8>:
java.lang.StringIndexOutOfBoundsException (String index out of range: 18)
[duplicate 4]
15/08/28 17:00:54 ERROR TaskSetManager: Task 56 in stage 9.0 failed 4 times;
aborting job
15/08/28 17:00:54 INFO TaskSetManager: Lost task 45.1 in stage 9.0 (TID 74) on
executor 10.104.74.6<http://10.104.74.6>:
java.lang.StringIndexOutOfBoundsException (String index out of range: 18)
[duplicate 5]
15/08/28 17:00:54 INFO TaskSchedulerImpl: Removed TaskSet 9.0, whose tasks have
all completed, from pool
15/08/28 17:00:54 INFO TaskSchedulerImpl: Cancelling stage 9
15/08/28 17:00:54 INFO DAGScheduler: ShuffleMapStage 9 (collect at
<console>:31) failed in 0.206 s
15/08/28 17:00:54 INFO DAGScheduler: Job 6 failed: collect at <console>:31,
took 0.293903 s
org.apache.spark.SparkException: Job aborted due to stage failure: Task 56 in
stage 9.0 failed 4 times, most recent failure: Lost task 56.3 in stage 9.0 (TID
75, 10.104.74.8): java.lang.StringIndexOutOfBoundsException: String index out
of range: 18
at java.lang.String.charAt(String.java:658)
at scala.collection.immutable.StringOps$.apply$extension(StringOps.scala:39)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:26)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:26)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at
org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$6.apply(Aggregate.scala:130)
at
org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$6.apply(Aggregate.scala:126)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.org<http://org.apache.spark.scheduler.DAGScheduler.org>$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
________________________________
Ricky Ou(欧 锐)
部 门:苏宁云商 IT总部技术支撑研发中心大
数据中心数据平台开发部
email : [email protected]<mailto:[email protected]>