Hi I want to use twitters public streaming api to follow a set of ids. I want to implement my driver using java. The current TwitterUtils is a wrapper around twitter4j and does not expose the full twitter streaming api.
I started by digging through the source code. Unfortunately I do not know scala spark-1.5.1/external/twitter/src/main/scala/org/apache/spark/streaming/twitt er/TwitterUtils.scala spark-1.5.1/external/twitter/src/main/scala/org/apache/spark/streaming/twitt er/TwitterInputDStream.scala String[] filter = {³topic1², ³topic2²}; JavaDStream<Status> tweets = TwitterUtils.createStream(ssc, twitterAuth, filter); Does anyone know why filters is defined as String[]? Internally spark creates a twitter4J FilterQueryClass. Ideally I would like to pass an filterQuery object. It exposes the part of the twitter streaming api I need to use to follow a set of user. Here is a link to the 4.0.4 version of the java doc http://twitter4j.org/oldjavadocs/4.0.4/index.html Turns out spark 1.5.1 uses version 3.0.3. http://twitter4j.org/oldjavadocs/3.0.3/index.html . Both versions implement java.io.Serializable I put a comment in where I think the change needs to go. It looks like it might be trivial. I guess in the short term I can try and rewrite the TwitterUtils and TwitterReceiver to do what I need to do in Java Thanks in advance Andy object TwitterUtils { /** * Create a input stream that returns tweets received from Twitter. * @param ssc StreamingContext object * @param twitterAuth Twitter4J authentication, or None to use Twitter4J's default OAuth * authorization; this uses the system properties twitter4j.oauth.consumerKey, * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and * twitter4j.oauth.accessTokenSecret * @param filters Set of filter strings to get only those tweets that match them * @param storageLevel Storage level to use for storing the received objects */ def createStream( ssc: StreamingContext, twitterAuth: Option[Authorization], filters: Seq[String] = Nil, // ??? Can we pass a FilterQuery object instead storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): ReceiverInputDStream[Status] = { new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel) } private[streaming] class TwitterReceiver( twitterAuth: Authorization, filters: Seq[String], storageLevel: StorageLevel ) extends Receiver[Status](storageLevel) with Logging { @volatile private var twitterStream: TwitterStream = _ @volatile private var stopped = false def onStart() { try { val newTwitterStream = new TwitterStreamFactory().getInstance(twitterAuth) newTwitterStream.addListener(new StatusListener { def onStatus(status: Status): Unit = { store(status) } // Unimplemented def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {} def onTrackLimitationNotice(i: Int) {} def onScrubGeo(l: Long, l1: Long) {} def onStallWarning(stallWarning: StallWarning) {} def onException(e: Exception) { if (!stopped) { restart("Error receiving tweets", e) } } }) val query = new FilterQuery // ??? Can we pass a FilterQuery object instead if (filters.size > 0) { query.track(filters.toArray) newTwitterStream.filter(query) } else { newTwitterStream.sample() } setTwitterStream(newTwitterStream) logInfo("Twitter receiver started") stopped = false } catch { case e: Exception => restart("Error starting Twitter stream", e) } }