Indeed, you are right! I felt like I was missing or misunderstanding
something.
Thank you so much!

Ali Gouta.

On Thu, Dec 10, 2015 at 10:04 PM, Cody Koeninger <[email protected]> wrote:

> I'm a little confused as to why you have fake events rather than just
> doing foreachRDD or foreachPartition on your kafka stream and updating the
> accumulator there.  I'd expect that to run each batch even if the batch had
> 0 kafka messages in it.
>
>
> On Thu, Dec 10, 2015 at 2:05 PM, AliGouta <[email protected]> wrote:
>
>> I am actually running out of options. In my spark streaming application. I
>> want to keep a state on some keys. I am getting events from Kafka. Then I
>> extract keys from the event, say userID. When there is no events coming
>> from
>> Kafka I want to keep updating a counter relative to each user ID each 3
>> seconds, since I configured the batchduration of my StreamingContext with
>> 3
>> seconds.
>>
>> Now the way I am doing it might be ugly, but at least it works: I have an
>> accumulableCollection like this:
>>
>> /al userID = ssc.sparkContext.accumulableCollection(new
>> mutable.HashMap[String,Long]())/
>>
>> Then I create a "fake" event and keep pushing it to my spark streaming
>> context as the following:
>>
>> /val rddQueue = new mutable.SynchronizedQueue[RDD[String]]()
>> for ( i <- 1 to  100) {
>>   rddQueue += ssc.sparkContext.makeRDD(Seq("FAKE_MESSAGE"))
>>   Thread.sleep(3000)
>> }
>> val inputStream = ssc.queueStream(rddQueue)
>>
>> inputStream.foreachRDD( UPDATE_MY_ACCUMULATOR )/
>>
>> This would let me access to my accumulatorCollection and update all the
>> counters of all userIDs. Up to now everything works fine, however when I
>> change my loop from:
>>
>> /for ( i <- 1 to  100) {} #This is for test/
>>
>> To:
>>
>> /while (true) {} #This is to let me access and update my accumulator
>> through
>> the whole application life cycle/
>>
>> Then when I run my ./spark-submit, my application gets stuck on this
>> stage:
>>
>> /15/12/10 18:09:00 INFO BlockManagerMasterActor: Registering block manager
>> slave1.cluster.example:38959 with 1060.3 MB RAM, BlockManagerId(1,
>> slave1.cluster.example, 38959)/
>>
>> Any clue on how to resolve this ? Is there a pretty straightforward way
>> that
>> would allow me updating the values of my userIDs (rather than creating an
>> unuseful RDD and pushing it periodically to the queuestream)?
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Replaying-an-RDD-in-spark-streaming-to-update-an-accumulator-tp25672.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: [email protected]
>> For additional commands, e-mail: [email protected]
>>
>>
>

Reply via email to