Jorn meant something like this:
val filteredStream = twitterStream.transform(rdd =>{
val newRDD = scc.sc.textFile("/this/file/will/be/updated/frequently").map(x
=> (x,1))
rdd.join(newRDD)
})
newRDD will work like a filter when you do the join.
Thanks
Best Regards
On Sun, Jul 19, 2015 at 9:32 PM, Zoran Jeremic <[email protected]>
wrote:
> Hi Jorn,
>
> I didn't know that it is possible to change filter without re-opening
> twitter stream. Actually, I already had that question earlier at the
> stackoverflow
> <http://stackoverflow.com/questions/30960984/apache-spark-twitter-streaming>
> and I got the answer that it's not possible, but it would be even better if
> there is some other way to add new hashtags or to remove old hashtags that
> user stopped following. I guess the second request would be more difficult.
>
> However, it would be great if you can give me some short example how to
> make this. I didn't understand well from your explanation what you mean by
> "join it with a rdd loading the newest hash tags from disk in a regular
> interval".
>
> Thanks,
> Zoran
>
> On Sun, Jul 19, 2015 at 5:01 AM, Jörn Franke <[email protected]> wrote:
>
>> Why do you even want to stop it? You can join it with a rdd loading the
>> newest hash tags from disk in a regular interval
>>
>> Le dim. 19 juil. 2015 à 7:40, Zoran Jeremic <[email protected]> a
>> écrit :
>>
>>> Hi,
>>>
>>> I have a twitter spark stream initialized in the following way:
>>>
>>> val ssc:StreamingContext =
>>>> SparkLauncher.getSparkScalaStreamingContext()
>>>> val config = getTwitterConfigurationBuilder.build()
>>>> val auth: Option[twitter4j.auth.Authorization] =
>>>> Some(new
>>>> twitter4j.auth.OAuthAuthorization(config))
>>>> val stream = TwitterUtils.createStream(ssc, auth,
>>>> filters)
>>>>
>>>
>>> This works fine when I initialy start it. However, at some point I need
>>> to update filters since users might add new hashtags they want to follow. I
>>> tried to stop the running stream and spark streaming context without
>>> stoping spark context, e.g:
>>>
>>>
>>>> stream.stop()
>>>> ssc.stop(false)
>>>>
>>>
>>> Afterward, I'm trying to initialize a new Twitter stream like I did
>>> previously. However, I got this exception:
>>>
>>> Exception in thread "Firestorm JMX Monitor"
>>>> java.lang.IllegalStateException: Adding new inputs, transformations, and
>>>> output operations after stopping a context is not supported
>>>> at
>>>> org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:224)
>>>> at
>>>> org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:64)
>>>> at
>>>> org.apache.spark.streaming.dstream.InputDStream.<init>(InputDStream.scala:41)
>>>> at
>>>> org.apache.spark.streaming.dstream.ReceiverInputDStream.<init>(ReceiverInputDStream.scala:41)
>>>> at
>>>> org.apache.spark.streaming.twitter.TwitterInputDStream.<init>(TwitterInputDStream.scala:46)
>>>> at
>>>> org.apache.spark.streaming.twitter.TwitterUtils$.createStream(TwitterUtils.scala:44)
>>>> at
>>>> org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.initializeNewStream(TwitterHashtagsStreamsManager.scala:113)
>>>> at
>>>> org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.restartHashTagsStream(TwitterHashtagsStreamsManager.scala:174)
>>>> at
>>>> org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.addHashTagsFromBufferAndRestartStream(TwitterHashtagsStreamsManager.scala:162)
>>>> at
>>>> org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$.processBufferEvents(HashtagsUpdatesBuffer.scala:41)
>>>> at
>>>> org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$$anon$1.run(HashtagsUpdatesBuffer.scala:19)
>>>> at java.util.TimerThread.mainLoop(Timer.java:555)
>>>> at java.util.TimerThread.run(Timer.java:505)
>>>> INFO [2015-07-18 22:24:23,430] [Twitter Stream consumer-1[Disposing
>>>> thread]] twitter4j.TwitterStreamImpl (SLF4JLogger.java:83) Inflater
>>>> has been closed
>>>> ERROR [2015-07-18 22:24:32,503]
>>>> [sparkDriver-akka.actor.default-dispatcher-3]
>>>> streaming.receiver.ReceiverSupervisorImpl (Logging.scala:75) Error
>>>> stopping receiver
>>>> 0org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:116)
>>>>
>>>
>>>>
>>>
>>> Anybody can explain how to solve this issue?
>>>
>>> Thanks,
>>> Zoran
>>>
>>
>
>
> --
>
> *******************************************************************************
> Zoran Jeremic, PhD
> Senior System Analyst & Programmer
>
> Athabasca University
> Tel: +1 604 92 89 944
> E-mail: [email protected] <[email protected]>
> Homepage: http://zoranjeremic.org
>
> **********************************************************************************
>