Jorn meant something like this:

val filteredStream = twitterStream.transform(rdd =>{

val newRDD = scc.sc.textFile("/this/file/will/be/updated/frequently").map(x
=> (x,1))

rdd.join(newRDD)

})

​newRDD will work like a filter when you do the join.​


Thanks
Best Regards

On Sun, Jul 19, 2015 at 9:32 PM, Zoran Jeremic <zoran.jere...@gmail.com>
wrote:

> Hi Jorn,
>
> I didn't know that it is possible to change filter without re-opening
> twitter stream. Actually, I already had that question earlier at the
> stackoverflow
> <http://stackoverflow.com/questions/30960984/apache-spark-twitter-streaming>
> and I got the answer that it's not possible, but it would be even better if
> there is some other way to add new hashtags or to remove old hashtags that
> user stopped following. I guess the second request would be more difficult.
>
> However, it would be great if you can give me some short example how to
> make this. I didn't understand well from your explanation what you mean by
> "join it with a rdd loading the newest hash tags from disk in a regular
> interval".
>
> Thanks,
> Zoran
>
> On Sun, Jul 19, 2015 at 5:01 AM, Jörn Franke <jornfra...@gmail.com> wrote:
>
>> Why do you even want to stop it? You can join it with a rdd loading the
>> newest hash tags from disk in a regular interval
>>
>> Le dim. 19 juil. 2015 à 7:40, Zoran Jeremic <zoran.jere...@gmail.com> a
>> écrit :
>>
>>> Hi,
>>>
>>> I have a twitter spark stream initialized in the following way:
>>>
>>>               val ssc:StreamingContext =
>>>> SparkLauncher.getSparkScalaStreamingContext()
>>>>               val config = getTwitterConfigurationBuilder.build()
>>>>               val auth: Option[twitter4j.auth.Authorization] =
>>>> Some(new
>>>>                         twitter4j.auth.OAuthAuthorization(config))
>>>>               val stream = TwitterUtils.createStream(ssc, auth,
>>>> filters)
>>>>
>>>
>>> This works fine when I initialy start it. However, at some point I need
>>> to update filters since users might add new hashtags they want to follow. I
>>> tried to stop the running stream and spark streaming context without
>>> stoping spark context, e.g:
>>>
>>>
>>>>                stream.stop()
>>>>                ssc.stop(false)
>>>>
>>>
>>> Afterward, I'm trying to initialize a new Twitter stream like I did
>>> previously. However, I got this exception:
>>>
>>> Exception in thread "Firestorm JMX Monitor"
>>>> java.lang.IllegalStateException: Adding new inputs, transformations, and
>>>> output operations after stopping a context is not supported
>>>>     at
>>>> org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:224)
>>>>     at
>>>> org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:64)
>>>>     at
>>>> org.apache.spark.streaming.dstream.InputDStream.<init>(InputDStream.scala:41)
>>>>     at
>>>> org.apache.spark.streaming.dstream.ReceiverInputDStream.<init>(ReceiverInputDStream.scala:41)
>>>>     at
>>>> org.apache.spark.streaming.twitter.TwitterInputDStream.<init>(TwitterInputDStream.scala:46)
>>>>     at
>>>> org.apache.spark.streaming.twitter.TwitterUtils$.createStream(TwitterUtils.scala:44)
>>>>     at
>>>> org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.initializeNewStream(TwitterHashtagsStreamsManager.scala:113)
>>>>     at
>>>> org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.restartHashTagsStream(TwitterHashtagsStreamsManager.scala:174)
>>>>     at
>>>> org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.addHashTagsFromBufferAndRestartStream(TwitterHashtagsStreamsManager.scala:162)
>>>>     at
>>>> org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$.processBufferEvents(HashtagsUpdatesBuffer.scala:41)
>>>>     at
>>>> org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$$anon$1.run(HashtagsUpdatesBuffer.scala:19)
>>>>     at java.util.TimerThread.mainLoop(Timer.java:555)
>>>>     at java.util.TimerThread.run(Timer.java:505)
>>>>  INFO    [2015-07-18 22:24:23,430] [Twitter Stream consumer-1[Disposing
>>>> thread]] twitter4j.TwitterStreamImpl (SLF4JLogger.java:83)         Inflater
>>>> has been closed
>>>> ERROR    [2015-07-18 22:24:32,503]
>>>> [sparkDriver-akka.actor.default-dispatcher-3]
>>>> streaming.receiver.ReceiverSupervisorImpl (Logging.scala:75)         Error
>>>> stopping receiver
>>>> 0org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:116)
>>>>
>>>
>>>>
>>>
>>> Anybody can explain how to solve this issue?
>>>
>>> Thanks,
>>> Zoran
>>>
>>
>
>
> --
>
> *******************************************************************************
> Zoran Jeremic, PhD
> Senior System Analyst & Programmer
>
> Athabasca University
> Tel: +1 604 92 89 944
> E-mail: zoran.jere...@gmail.com <zoran.jere...@va.mod.gov.rs>
> Homepage:  http://zoranjeremic.org
>
> **********************************************************************************
>

Reply via email to