see below a more complete version of the code.
the firstDate (previously minDate) should not be null, I even added an
extra "filter( _._2 != null)" before the flatMap and the error is
still there.
What I don't understand is why I have the error on
dateSeq.las.plusDays and not on dateSeq.last.isBefore (in the
condition).
I also tried changing the allDates function to use a while loop but i
got the same error.
def allDates(dateStart: DateTime, dateEnd: DateTime): Seq[DateTime] = {
var dateSeq = Seq(dateStart)
var currentDate = dateStart
while (currentDate.isBefore(dateEnd)){
dateSeq = dateSeq :+ currentDate
currentDate = currentDate.plusDays(1)
}
return dateSeq
}
val videoAllDates = events.select("player_id", "current_ts")
.filter("player_id is not null") .filter("current_ts is not
null") .map( row => (row.getString(0),
timestampToDate(row.getString(1)))) .filter(r =>
r._2.isAfter(minimumDate)) .reduceByKey(minDateTime)
.flatMapValues( firstDate => allDates(firstDate, endDate))
And the stack trace.
15/11/10 21:10:36 INFO MapOutputTrackerMasterActor: Asked to send map
output locations for shuffle 2 to [email protected]:50821
15/11/10 21:10:36 INFO MapOutputTrackerMaster: Size of output statuses for
shuffle 2 is 695 bytes
15/11/10 21:10:36 INFO MapOutputTrackerMasterActor: Asked to send map
output locations for shuffle 1 to [email protected]:50821
15/11/10 21:10:36 INFO MapOutputTrackerMaster: Size of output statuses for
shuffle 1 is 680 bytes
15/11/10 21:10:36 INFO TaskSetManager: Starting task 206.0 in stage 3.0
(TID 798, R610-2.pro.hupi.loc, PROCESS_LOCAL, 4416 bytes)
15/11/10 21:10:36 WARN TaskSetManager: Lost task 205.0 in stage 3.0 (TID
797, R610-2.pro.hupi.loc): java.lang.NullPointerException
at org.joda.time.DateTime.plusDays(DateTime.java:1070)
at Heatmap$.allDates(heatmap.scala:34)
at Heatmap$$anonfun$12.apply(heatmap.scala:97)
at Heatmap$$anonfun$12.apply(heatmap.scala:97)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$flatMapValues$1$$anonfun$apply$16.apply(PairRDDFunctions.scala:686)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$flatMapValues$1$$anonfun$apply$16.apply(PairRDDFunctions.scala:685)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125)
at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160)
at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/11/10 21:10:36 INFO TaskSetManager: Starting task 205.1 in stage 3.0
(TID 799, R610-2.pro.hupi.loc, PROCESS_LOCAL, 4416 bytes)
15/11/10 21:10:36 INFO TaskSetManager: Finished task 206.0 in stage 3.0
(TID 798) in 13 ms on R610-2.pro.hupi.loc (182/410)
15/11/10 21:10:36 INFO TaskSetManager: Starting task 207.0 in stage 3.0
(TID 800, R610-2.pro.hupi.loc, PROCESS_LOCAL, 4416 bytes)
15/11/10 21:10:36 INFO TaskSetManager: Lost task 205.1 in stage 3.0 (TID
799) on executor R610-2.pro.hupi.loc: java.lang.NullPointerException (null)
[duplicate 1]
15/11/10 21:10:36 INFO TaskSetManager: Starting task 205.2 in stage 3.0
(TID 801, R610-2.pro.hupi.loc, PROCESS_LOCAL, 4416 bytes)
15/11/10 21:10:36 INFO TaskSetManager: Finished task 207.0 in stage 3.0
(TID 800) in 14 ms on R610-2.pro.hupi.loc (183/410)
15/11/10 21:10:36 INFO TaskSetManager: Starting task 208.0 in stage 3.0
(TID 802, R610-2.pro.hupi.loc, PROCESS_LOCAL, 4416 bytes)
15/11/10 21:10:36 INFO TaskSetManager: Lost task 205.2 in stage 3.0 (TID
801) on executor R610-2.pro.hupi.loc: java.lang.NullPointerException (null)
[duplicate 2]
15/11/10 21:10:36 INFO TaskSetManager: Starting task 205.3 in stage 3.0
(TID 803, R610-2.pro.hupi.loc, PROCESS_LOCAL, 4416 bytes)
15/11/10 21:10:36 INFO TaskSetManager: Finished task 208.0 in stage 3.0
(TID 802) in 12 ms on R610-2.pro.hupi.loc (184/410)
15/11/10 21:10:36 INFO TaskSetManager: Starting task 209.0 in stage 3.0
(TID 804, R610-2.pro.hupi.loc, PROCESS_LOCAL, 4416 bytes)
15/11/10 21:10:36 INFO TaskSetManager: Lost task 205.3 in stage 3.0 (TID
803) on executor R610-2.pro.hupi.loc: java.lang.NullPointerException (null)
[duplicate 3]
15/11/10 21:10:36 ERROR TaskSetManager: Task 205 in stage 3.0 failed 4
times; aborting job
15/11/10 21:10:36 INFO YarnScheduler: Cancelling stage 3
15/11/10 21:10:36 INFO YarnScheduler: Stage 3 was cancelled
15/11/10 21:10:36 INFO DAGScheduler: Job 1 failed: saveAsTextFile at
heatmap.scala:116, took 45,476562 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due
to stage failure: Task 205 in stage 3.0 failed 4 times, most recent
failure: Lost task 205.3 in stage 3.0 (TID 803, R610-2.pro.hupi.loc):
java.lang.NullPointerException
at org.joda.time.DateTime.plusDays(DateTime.java:1070)
at Heatmap$.allDates(heatmap.scala:34)
at Heatmap$$anonfun$12.apply(heatmap.scala:97)
at Heatmap$$anonfun$12.apply(heatmap.scala:97)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$flatMapValues$1$$anonfun$apply$16.apply(PairRDDFunctions.scala:686)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$flatMapValues$1$$anonfun$apply$16.apply(PairRDDFunctions.scala:685)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125)
at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160)
at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1210)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1199)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1198)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1198)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1400)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1361)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
15/11/10 21:10:36 WARN TaskSetManager: Lost task 209.0 in stage 3.0 (TID
804, R610-2.pro.hupi.loc): TaskKilled (killed intentionally)
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/opt/cloudera/parcels/CDH-5.4.7-1.cdh5.4.7.p0.3/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/opt/cloudera/parcels/CDH-5.4.7-1.cdh5.4.7.p0.3/jars/avro-tools-1.7.6-cdh5.4.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2015-11-10 18:39 GMT+01:00 Ted Yu <[email protected]>:
> Can you show the stack trace for the NPE ?
>
> Which release of Spark are you using ?
>
> Cheers
>
> On Tue, Nov 10, 2015 at 8:20 AM, romain sagean <[email protected]>
> wrote:
>
>> Hi community,
>> I try to apply the function below during a flatMapValues or a map but I
>> get a nullPointerException with the plusDays(1). What did I miss ?
>>
>> def allDates(dateSeq: Seq[DateTime], dateEnd: DateTime): Seq[DateTime] = {
>> if (dateSeq.last.isBefore(dateEnd)){
>> allDates(dateSeq:+ dateSeq.last.plusDays(1), dateEnd)
>> } else {
>> dateSeq
>> }
>> }
>>
>> val videoAllDates = .select("player_id", "mindate").flatMapValues(
>> minDate => allDates(Seq(minDate), endDate))
>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: [email protected]
>> For additional commands, e-mail: [email protected]
>>
>>
>
--
*Romain Sagean*
*[email protected] <[email protected]>*