Hi
I want to use twitters public streaming api to follow a set of ids. I want
to implement my driver using java. The current TwitterUtils is a wrapper
around twitter4j and does not expose the full twitter streaming api.
I started by digging through the source code. Unfortunately I do not know
scala
spark-1.5.1/external/twitter/src/main/scala/org/apache/spark/streaming/twitt
er/TwitterUtils.scala
spark-1.5.1/external/twitter/src/main/scala/org/apache/spark/streaming/twitt
er/TwitterInputDStream.scala
String[] filter = {³topic1², ³topic2²};
JavaDStream<Status> tweets = TwitterUtils.createStream(ssc,
twitterAuth, filter);
Does anyone know why filters is defined as String[]? Internally spark
creates a twitter4J FilterQueryClass. Ideally I would like to pass an
filterQuery object. It exposes the part of the twitter streaming api I need
to use to follow a set of user.
Here is a link to the 4.0.4 version of the java doc
http://twitter4j.org/oldjavadocs/4.0.4/index.html
Turns out spark 1.5.1 uses version 3.0.3.
http://twitter4j.org/oldjavadocs/3.0.3/index.html . Both versions implement
java.io.Serializable
I put a comment in where I think the change needs to go. It looks like it
might be trivial. I guess in the short term I can try and rewrite the
TwitterUtils and TwitterReceiver to do what I need to do in Java
Thanks in advance
Andy
object TwitterUtils {
/**
* Create a input stream that returns tweets received from Twitter.
* @param ssc StreamingContext object
* @param twitterAuth Twitter4J authentication, or None to use Twitter4J's
default OAuth
* authorization; this uses the system properties
twitter4j.oauth.consumerKey,
* twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and
* twitter4j.oauth.accessTokenSecret
* @param filters Set of filter strings to get only those tweets that
match them
* @param storageLevel Storage level to use for storing the received
objects
*/
def createStream(
ssc: StreamingContext,
twitterAuth: Option[Authorization],
filters: Seq[String] = Nil, // ??? Can we pass a FilterQuery object
instead
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[Status] = {
new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel)
}
private[streaming]
class TwitterReceiver(
twitterAuth: Authorization,
filters: Seq[String],
storageLevel: StorageLevel
) extends Receiver[Status](storageLevel) with Logging {
@volatile private var twitterStream: TwitterStream = _
@volatile private var stopped = false
def onStart() {
try {
val newTwitterStream = new
TwitterStreamFactory().getInstance(twitterAuth)
newTwitterStream.addListener(new StatusListener {
def onStatus(status: Status): Unit = {
store(status)
}
// Unimplemented
def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {}
def onTrackLimitationNotice(i: Int) {}
def onScrubGeo(l: Long, l1: Long) {}
def onStallWarning(stallWarning: StallWarning) {}
def onException(e: Exception) {
if (!stopped) {
restart("Error receiving tweets", e)
}
}
})
val query = new FilterQuery // ??? Can we pass a FilterQuery object
instead
if (filters.size > 0) {
query.track(filters.toArray)
newTwitterStream.filter(query)
} else {
newTwitterStream.sample()
}
setTwitterStream(newTwitterStream)
logInfo("Twitter receiver started")
stopped = false
} catch {
case e: Exception => restart("Error starting Twitter stream", e)
}
}