Jorn meant something like this: val filteredStream = twitterStream.transform(rdd =>{
val newRDD = scc.sc.textFile("/this/file/will/be/updated/frequently").map(x => (x,1)) rdd.join(newRDD) }) newRDD will work like a filter when you do the join. Thanks Best Regards On Sun, Jul 19, 2015 at 9:32 PM, Zoran Jeremic <zoran.jere...@gmail.com> wrote: > Hi Jorn, > > I didn't know that it is possible to change filter without re-opening > twitter stream. Actually, I already had that question earlier at the > stackoverflow > <http://stackoverflow.com/questions/30960984/apache-spark-twitter-streaming> > and I got the answer that it's not possible, but it would be even better if > there is some other way to add new hashtags or to remove old hashtags that > user stopped following. I guess the second request would be more difficult. > > However, it would be great if you can give me some short example how to > make this. I didn't understand well from your explanation what you mean by > "join it with a rdd loading the newest hash tags from disk in a regular > interval". > > Thanks, > Zoran > > On Sun, Jul 19, 2015 at 5:01 AM, Jörn Franke <jornfra...@gmail.com> wrote: > >> Why do you even want to stop it? You can join it with a rdd loading the >> newest hash tags from disk in a regular interval >> >> Le dim. 19 juil. 2015 à 7:40, Zoran Jeremic <zoran.jere...@gmail.com> a >> écrit : >> >>> Hi, >>> >>> I have a twitter spark stream initialized in the following way: >>> >>> val ssc:StreamingContext = >>>> SparkLauncher.getSparkScalaStreamingContext() >>>> val config = getTwitterConfigurationBuilder.build() >>>> val auth: Option[twitter4j.auth.Authorization] = >>>> Some(new >>>> twitter4j.auth.OAuthAuthorization(config)) >>>> val stream = TwitterUtils.createStream(ssc, auth, >>>> filters) >>>> >>> >>> This works fine when I initialy start it. However, at some point I need >>> to update filters since users might add new hashtags they want to follow. I >>> tried to stop the running stream and spark streaming context without >>> stoping spark context, e.g: >>> >>> >>>> stream.stop() >>>> ssc.stop(false) >>>> >>> >>> Afterward, I'm trying to initialize a new Twitter stream like I did >>> previously. However, I got this exception: >>> >>> Exception in thread "Firestorm JMX Monitor" >>>> java.lang.IllegalStateException: Adding new inputs, transformations, and >>>> output operations after stopping a context is not supported >>>> at >>>> org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:224) >>>> at >>>> org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:64) >>>> at >>>> org.apache.spark.streaming.dstream.InputDStream.<init>(InputDStream.scala:41) >>>> at >>>> org.apache.spark.streaming.dstream.ReceiverInputDStream.<init>(ReceiverInputDStream.scala:41) >>>> at >>>> org.apache.spark.streaming.twitter.TwitterInputDStream.<init>(TwitterInputDStream.scala:46) >>>> at >>>> org.apache.spark.streaming.twitter.TwitterUtils$.createStream(TwitterUtils.scala:44) >>>> at >>>> org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.initializeNewStream(TwitterHashtagsStreamsManager.scala:113) >>>> at >>>> org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.restartHashTagsStream(TwitterHashtagsStreamsManager.scala:174) >>>> at >>>> org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.addHashTagsFromBufferAndRestartStream(TwitterHashtagsStreamsManager.scala:162) >>>> at >>>> org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$.processBufferEvents(HashtagsUpdatesBuffer.scala:41) >>>> at >>>> org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$$anon$1.run(HashtagsUpdatesBuffer.scala:19) >>>> at java.util.TimerThread.mainLoop(Timer.java:555) >>>> at java.util.TimerThread.run(Timer.java:505) >>>> INFO [2015-07-18 22:24:23,430] [Twitter Stream consumer-1[Disposing >>>> thread]] twitter4j.TwitterStreamImpl (SLF4JLogger.java:83) Inflater >>>> has been closed >>>> ERROR [2015-07-18 22:24:32,503] >>>> [sparkDriver-akka.actor.default-dispatcher-3] >>>> streaming.receiver.ReceiverSupervisorImpl (Logging.scala:75) Error >>>> stopping receiver >>>> 0org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:116) >>>> >>> >>>> >>> >>> Anybody can explain how to solve this issue? >>> >>> Thanks, >>> Zoran >>> >> > > > -- > > ******************************************************************************* > Zoran Jeremic, PhD > Senior System Analyst & Programmer > > Athabasca University > Tel: +1 604 92 89 944 > E-mail: zoran.jere...@gmail.com <zoran.jere...@va.mod.gov.rs> > Homepage: http://zoranjeremic.org > > ********************************************************************************** >