Hi Akhil and Jorn,

I tried as you suggested to create some simple scenario, but I have an
error on rdd.join(newRDD):  "value join is not a member of
org.apache.spark.rdd.RDD[twitter4j.Status]". The code looks like:

    val stream = TwitterUtils.createStream(ssc, auth)
>     val filteredStream= stream.transform(rdd =>{
>     val samplehashtags=Array("music","film")
>     val newRDD= samplehashtags.map { x => (x,1) }
>     rdd.join(newRDD)
>      })
>

Did I miss something here?

Thanks,
Zoran

On Mon, Jul 20, 2015 at 9:54 AM, Zoran Jeremic <zoran.jere...@gmail.com>
wrote:

> Thanks for explanation.
>
> If I understand this correctly, in this approach I would actually stream
> everything from Twitter, and perform filtering in my application using
> Spark. Isn't this too much overhead if my application is interested in
> listening for couple of hundreds or thousands hashtags?
> On one side, this will be better approach since I will not have the
> problem to open new streams if number of hashtags go over 400 which is the
> Twitter limit for User stream filtering, but on the other side I'm concern
> about how much it will affect application performance if I stream
> everything that is posted on Twitter and filter it locally. It would be
> great if somebody with experience on this could comment on these concerns.
>
> Thanks,
> Zoran
>
> On Mon, Jul 20, 2015 at 12:19 AM, Akhil Das <ak...@sigmoidanalytics.com>
> wrote:
>
>> Jorn meant something like this:
>>
>> val filteredStream = twitterStream.transform(rdd =>{
>>
>> val newRDD =
>> scc.sc.textFile("/this/file/will/be/updated/frequently").map(x => (x,1))
>>
>> rdd.join(newRDD)
>>
>> })
>>
>> ​newRDD will work like a filter when you do the join.​
>>
>>
>> Thanks
>> Best Regards
>>
>> On Sun, Jul 19, 2015 at 9:32 PM, Zoran Jeremic <zoran.jere...@gmail.com>
>> wrote:
>>
>>> Hi Jorn,
>>>
>>> I didn't know that it is possible to change filter without re-opening
>>> twitter stream. Actually, I already had that question earlier at the
>>> stackoverflow
>>> <http://stackoverflow.com/questions/30960984/apache-spark-twitter-streaming>
>>> and I got the answer that it's not possible, but it would be even better if
>>> there is some other way to add new hashtags or to remove old hashtags that
>>> user stopped following. I guess the second request would be more difficult.
>>>
>>> However, it would be great if you can give me some short example how to
>>> make this. I didn't understand well from your explanation what you mean by
>>> "join it with a rdd loading the newest hash tags from disk in a regular
>>> interval".
>>>
>>> Thanks,
>>> Zoran
>>>
>>> On Sun, Jul 19, 2015 at 5:01 AM, Jörn Franke <jornfra...@gmail.com>
>>> wrote:
>>>
>>>> Why do you even want to stop it? You can join it with a rdd loading the
>>>> newest hash tags from disk in a regular interval
>>>>
>>>> Le dim. 19 juil. 2015 à 7:40, Zoran Jeremic <zoran.jere...@gmail.com>
>>>> a écrit :
>>>>
>>>>> Hi,
>>>>>
>>>>> I have a twitter spark stream initialized in the following way:
>>>>>
>>>>>               val ssc:StreamingContext =
>>>>>> SparkLauncher.getSparkScalaStreamingContext()
>>>>>>               val config = getTwitterConfigurationBuilder.build()
>>>>>>               val auth: Option[twitter4j.auth.Authorization] =
>>>>>> Some(new
>>>>>>                         twitter4j.auth.OAuthAuthorization(config))
>>>>>>               val stream = TwitterUtils.createStream(ssc, auth,
>>>>>> filters)
>>>>>>
>>>>>
>>>>> This works fine when I initialy start it. However, at some point I
>>>>> need to update filters since users might add new hashtags they want to
>>>>> follow. I tried to stop the running stream and spark streaming context
>>>>> without stoping spark context, e.g:
>>>>>
>>>>>
>>>>>>                stream.stop()
>>>>>>                ssc.stop(false)
>>>>>>
>>>>>
>>>>> Afterward, I'm trying to initialize a new Twitter stream like I did
>>>>> previously. However, I got this exception:
>>>>>
>>>>> Exception in thread "Firestorm JMX Monitor"
>>>>>> java.lang.IllegalStateException: Adding new inputs, transformations, and
>>>>>> output operations after stopping a context is not supported
>>>>>>     at
>>>>>> org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:224)
>>>>>>     at
>>>>>> org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:64)
>>>>>>     at
>>>>>> org.apache.spark.streaming.dstream.InputDStream.<init>(InputDStream.scala:41)
>>>>>>     at
>>>>>> org.apache.spark.streaming.dstream.ReceiverInputDStream.<init>(ReceiverInputDStream.scala:41)
>>>>>>     at
>>>>>> org.apache.spark.streaming.twitter.TwitterInputDStream.<init>(TwitterInputDStream.scala:46)
>>>>>>     at
>>>>>> org.apache.spark.streaming.twitter.TwitterUtils$.createStream(TwitterUtils.scala:44)
>>>>>>     at
>>>>>> org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.initializeNewStream(TwitterHashtagsStreamsManager.scala:113)
>>>>>>     at
>>>>>> org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.restartHashTagsStream(TwitterHashtagsStreamsManager.scala:174)
>>>>>>     at
>>>>>> org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.addHashTagsFromBufferAndRestartStream(TwitterHashtagsStreamsManager.scala:162)
>>>>>>     at
>>>>>> org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$.processBufferEvents(HashtagsUpdatesBuffer.scala:41)
>>>>>>     at
>>>>>> org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$$anon$1.run(HashtagsUpdatesBuffer.scala:19)
>>>>>>     at java.util.TimerThread.mainLoop(Timer.java:555)
>>>>>>     at java.util.TimerThread.run(Timer.java:505)
>>>>>>  INFO    [2015-07-18 22:24:23,430] [Twitter Stream
>>>>>> consumer-1[Disposing thread]] twitter4j.TwitterStreamImpl
>>>>>> (SLF4JLogger.java:83)         Inflater has been closed
>>>>>> ERROR    [2015-07-18 22:24:32,503]
>>>>>> [sparkDriver-akka.actor.default-dispatcher-3]
>>>>>> streaming.receiver.ReceiverSupervisorImpl (Logging.scala:75)         
>>>>>> Error
>>>>>> stopping receiver
>>>>>> 0org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:116)
>>>>>>
>>>>>
>>>>>>
>>>>>
>>>>> Anybody can explain how to solve this issue?
>>>>>
>>>>> Thanks,
>>>>> Zoran
>>>>>
>>>>
>>>
>>>
>>> --
>>>
>>> *******************************************************************************
>>> Zoran Jeremic, PhD
>>> Senior System Analyst & Programmer
>>>
>>> Athabasca University
>>> Tel: +1 604 92 89 944
>>> E-mail: zoran.jere...@gmail.com <zoran.jere...@va.mod.gov.rs>
>>> Homepage:  http://zoranjeremic.org
>>>
>>> **********************************************************************************
>>>
>>
>>
>
>
> --
>
> *******************************************************************************
> Zoran Jeremic, PhD
> Senior System Analyst & Programmer
>
> Athabasca University
> Tel: +1 604 92 89 944
> E-mail: zoran.jere...@gmail.com <zoran.jere...@va.mod.gov.rs>
> Homepage:  http://zoranjeremic.org
>
> **********************************************************************************
>



-- 
*******************************************************************************
Zoran Jeremic, PhD
Senior System Analyst & Programmer

Athabasca University
Tel: +1 604 92 89 944
E-mail: zoran.jere...@gmail.com <zoran.jere...@va.mod.gov.rs>
Homepage:  http://zoranjeremic.org
**********************************************************************************

Reply via email to