Indeed, you are right! I felt like I was missing or misunderstanding something. Thank you so much!
Ali Gouta. On Thu, Dec 10, 2015 at 10:04 PM, Cody Koeninger <[email protected]> wrote: > I'm a little confused as to why you have fake events rather than just > doing foreachRDD or foreachPartition on your kafka stream and updating the > accumulator there. I'd expect that to run each batch even if the batch had > 0 kafka messages in it. > > > On Thu, Dec 10, 2015 at 2:05 PM, AliGouta <[email protected]> wrote: > >> I am actually running out of options. In my spark streaming application. I >> want to keep a state on some keys. I am getting events from Kafka. Then I >> extract keys from the event, say userID. When there is no events coming >> from >> Kafka I want to keep updating a counter relative to each user ID each 3 >> seconds, since I configured the batchduration of my StreamingContext with >> 3 >> seconds. >> >> Now the way I am doing it might be ugly, but at least it works: I have an >> accumulableCollection like this: >> >> /al userID = ssc.sparkContext.accumulableCollection(new >> mutable.HashMap[String,Long]())/ >> >> Then I create a "fake" event and keep pushing it to my spark streaming >> context as the following: >> >> /val rddQueue = new mutable.SynchronizedQueue[RDD[String]]() >> for ( i <- 1 to 100) { >> rddQueue += ssc.sparkContext.makeRDD(Seq("FAKE_MESSAGE")) >> Thread.sleep(3000) >> } >> val inputStream = ssc.queueStream(rddQueue) >> >> inputStream.foreachRDD( UPDATE_MY_ACCUMULATOR )/ >> >> This would let me access to my accumulatorCollection and update all the >> counters of all userIDs. Up to now everything works fine, however when I >> change my loop from: >> >> /for ( i <- 1 to 100) {} #This is for test/ >> >> To: >> >> /while (true) {} #This is to let me access and update my accumulator >> through >> the whole application life cycle/ >> >> Then when I run my ./spark-submit, my application gets stuck on this >> stage: >> >> /15/12/10 18:09:00 INFO BlockManagerMasterActor: Registering block manager >> slave1.cluster.example:38959 with 1060.3 MB RAM, BlockManagerId(1, >> slave1.cluster.example, 38959)/ >> >> Any clue on how to resolve this ? Is there a pretty straightforward way >> that >> would allow me updating the values of my userIDs (rather than creating an >> unuseful RDD and pushing it periodically to the queuestream)? >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/Replaying-an-RDD-in-spark-streaming-to-update-an-accumulator-tp25672.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: [email protected] >> For additional commands, e-mail: [email protected] >> >> >
