Oh - and one other note on this, which appears to be the case.
If , in your stream forEachRDD implementation, you do something stupid
(like call rdd.count())
tweetStream.foreachRDD((rdd,lent)=> {
tweetStream.repartition(1)
numTweetsCollected+=1;
//val count = rdd.count() DONT DO THIS !
You can also get stuck in a situation where your RDD processor blocks
infinitely.
And for twitter specific stuff, make sure to look at modifying the
TwitterInputDStream class
so that it implements the stuff from SPARK-2464, which can lead to infinite
stream reopening as well.
On Tue, Oct 21, 2014 at 11:02 AM, jay vyas <[email protected]>
wrote:
> Hi Spark ! I found out why my RDD's werent coming through in my spark
> stream.
>
> It turns out you need the onStart() needs to return , it seems - i.e. you
> need to launch the worker part of your
> start process in a thread. For example....
>
> def onStartMock():Unit ={
> val future = new Thread(new Runnable() {
> def run() {
> for(x <- 1 until 1000000000) {
> val newMem = Runtime.getRuntime.freeMemory()/12188091;
> if(newMem != lastMem){
> System.out.println(" in thread : " + newMem);
> }
> lastMem=newMem;
> store(mockStatus);
> }
> }});
>
> Hope that helps somebody in the same situation. FYI Its in the docs :)
>
> * {{{
> * class MyReceiver(storageLevel: StorageLevel) extends
> NetworkReceiver[String](storageLevel) {
> * def onStart() {
> * // Setup stuff (start threads, open sockets, etc.) to start
> receiving data.
> * // Must start new thread to receive data, as onStart() must be
> non-blocking.
> *
> * // Call store(...) in those threads to store received data
> into Spark's memory.
> *
> * // Call stop(...), restart(...) or reportError(...) on any
> thread based on how
> * // different errors needs to be handled.
> *
> * // See corresponding method documentation for more details
> * }
> *
> * def onStop() {
> * // Cleanup stuff (stop threads, close sockets, etc.) to stop
> receiving data.
> * }
> * }
> * }}}
>
>
--
jay vyas