Ok, it's a bug in spark. I've submitted a patch: https://issues.apache.org/jira/browse/SPARK-2009
On Mon, Jun 2, 2014 at 8:39 PM, Vadim Chekan <kot.bege...@gmail.com> wrote: > Thanks for looking into this Tathagata. > > Are you looking for traces of ReceiveInputDStream.clearMetadata call? > Here is the log: http://wepaste.com/vchekan > > Vadim. > > > On Mon, Jun 2, 2014 at 5:58 PM, Tathagata Das <tathagata.das1...@gmail.com > > wrote: > >> Can you give all the logs? Would like to see what is clearing the key " >> 1401754908000 >> ms" >> >> TD >> >> >> On Mon, Jun 2, 2014 at 5:38 PM, Vadim Chekan <kot.bege...@gmail.com> >> wrote: >> >>> Ok, it seems like "Time ... is invalid" is part of normal workflow, when >>> window DStream will ignore RDDs at moments in time when they do not match >>> to the window sliding interval. But why am I getting exception is still >>> unclear. Here is the full stack: >>> >>> 14/06/02 17:21:48 INFO WindowedDStream: Time 1401754908000 ms is invalid >>> as zeroTime is 1401754907000 ms and slideDuration is 4000 ms and difference >>> is 1000 ms >>> 14/06/02 17:21:48 ERROR OneForOneStrategy: key not found: 1401754908000 >>> ms >>> java.util.NoSuchElementException: key not found: 1401754908000 ms >>> at scala.collection.MapLike$class.default(MapLike.scala:228) >>> at scala.collection.AbstractMap.default(Map.scala:58) >>> at scala.collection.mutable.HashMap.apply(HashMap.scala:64) >>> at >>> org.apache.spark.streaming.dstream.ReceiverInputDStream.getReceivedBlockInfo(ReceiverInputDStream.scala:77) >>> at >>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:225) >>> at >>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:223) >>> at >>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >>> at >>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >>> at >>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) >>> at >>> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) >>> at >>> scala.collection.TraversableLike$class.map(TraversableLike.scala:244) >>> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) >>> at >>> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:223) >>> at org.apache.spark.streaming.scheduler.JobGenerator.org >>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:165) >>> at >>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:76) >>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) >>> at akka.actor.ActorCell.invoke(ActorCell.scala:456) >>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) >>> at akka.dispatch.Mailbox.run(Mailbox.scala:219) >>> at >>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) >>> at >>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>> at >>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>> at >>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>> at >>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>> >>> >>> On Mon, Jun 2, 2014 at 5:22 PM, Vadim Chekan <kot.bege...@gmail.com> >>> wrote: >>> >>>> Hi all, >>>> >>>> I am getting an error: >>>> ================ >>>> 14/06/02 17:06:32 INFO WindowedDStream: Time 1401753992000 ms is >>>> invalid as zeroTime is 1401753986000 ms and slideDuration is 4000 ms and >>>> difference is 6000 ms >>>> 14/06/02 17:06:32 ERROR OneForOneStrategy: key not found: 1401753992000 >>>> ms >>>> ================ >>>> >>>> My relevant code is: >>>> =================== >>>> ssc = new StreamingContext(conf, Seconds(1)) >>>> val messageEvents = events. >>>> flatMap(e => evaluatorCached.value.find(e)). >>>> window(Seconds(8), Seconds(4)) >>>> messageEvents.print() >>>> =================== >>>> >>>> Seems all right to me, window slide duration (4) is streaming context >>>> batch duration (1) *2. So, what's the problem? >>>> >>>> Spark-v1.0.0 >>>> >>>> -- >>>> From RFC 2631: In ASN.1, EXPLICIT tagging is implicit unless IMPLICIT >>>> is explicitly specified >>>> >>> >>> >>> >>> -- >>> From RFC 2631: In ASN.1, EXPLICIT tagging is implicit unless IMPLICIT is >>> explicitly specified >>> >> >> > > > -- > From RFC 2631: In ASN.1, EXPLICIT tagging is implicit unless IMPLICIT is > explicitly specified > -- >From RFC 2631: In ASN.1, EXPLICIT tagging is implicit unless IMPLICIT is explicitly specified