Hi,
I have a twitter spark stream initialized in the following way:
val ssc:StreamingContext =
> SparkLauncher.getSparkScalaStreamingContext()
> val config = getTwitterConfigurationBuilder.build()
> val auth: Option[twitter4j.auth.Authorization] =
> Some(new
> twitter4j.auth.OAuthAuthorization(config))
> val stream = TwitterUtils.createStream(ssc, auth, filters)
>
This works fine when I initialy start it. However, at some point I need to
update filters since users might add new hashtags they want to follow. I
tried to stop the running stream and spark streaming context without
stoping spark context, e.g:
> stream.stop()
> ssc.stop(false)
>
Afterward, I'm trying to initialize a new Twitter stream like I did
previously. However, I got this exception:
Exception in thread "Firestorm JMX Monitor"
> java.lang.IllegalStateException: Adding new inputs, transformations, and
> output operations after stopping a context is not supported
> at
> org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:224)
> at org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:64)
> at
> org.apache.spark.streaming.dstream.InputDStream.<init>(InputDStream.scala:41)
> at
> org.apache.spark.streaming.dstream.ReceiverInputDStream.<init>(ReceiverInputDStream.scala:41)
> at
> org.apache.spark.streaming.twitter.TwitterInputDStream.<init>(TwitterInputDStream.scala:46)
> at
> org.apache.spark.streaming.twitter.TwitterUtils$.createStream(TwitterUtils.scala:44)
> at
> org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.initializeNewStream(TwitterHashtagsStreamsManager.scala:113)
> at
> org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.restartHashTagsStream(TwitterHashtagsStreamsManager.scala:174)
> at
> org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.addHashTagsFromBufferAndRestartStream(TwitterHashtagsStreamsManager.scala:162)
> at
> org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$.processBufferEvents(HashtagsUpdatesBuffer.scala:41)
> at
> org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$$anon$1.run(HashtagsUpdatesBuffer.scala:19)
> at java.util.TimerThread.mainLoop(Timer.java:555)
> at java.util.TimerThread.run(Timer.java:505)
> INFO [2015-07-18 22:24:23,430] [Twitter Stream consumer-1[Disposing
> thread]] twitter4j.TwitterStreamImpl (SLF4JLogger.java:83) Inflater
> has been closed
> ERROR [2015-07-18 22:24:32,503]
> [sparkDriver-akka.actor.default-dispatcher-3]
> streaming.receiver.ReceiverSupervisorImpl (Logging.scala:75) Error
> stopping receiver
> 0org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:116)
>
>
Anybody can explain how to solve this issue?
Thanks,
Zoran