Hi

I want to use twitters public streaming api to follow a set of ids. I want
to implement my driver using java. The current TwitterUtils is a wrapper
around twitter4j and does not expose the full twitter streaming api.

I started by digging through the source code. Unfortunately I do not know
scala

spark-1.5.1/external/twitter/src/main/scala/org/apache/spark/streaming/twitt
er/TwitterUtils.scala
spark-1.5.1/external/twitter/src/main/scala/org/apache/spark/streaming/twitt
er/TwitterInputDStream.scala

       String[] filter = {³topic1², ³topic2²};

        JavaDStream<Status> tweets = TwitterUtils.createStream(ssc,
twitterAuth, filter);


Does anyone know why filters is defined as String[]? Internally spark
creates a twitter4J FilterQueryClass. Ideally I would like to pass an
filterQuery object. It exposes the part of the twitter streaming api I need
to use to follow a set of user.

Here is a link to the 4.0.4 version of the java doc
http://twitter4j.org/oldjavadocs/4.0.4/index.html

Turns out spark 1.5.1 uses version 3.0.3.
http://twitter4j.org/oldjavadocs/3.0.3/index.html . Both versions implement
java.io.Serializable

I put a comment in where I think the change needs to go. It looks like it
might be trivial. I guess in the short term I can try and rewrite the
TwitterUtils and TwitterReceiver to do what I need to do in Java

Thanks in advance

Andy

object TwitterUtils {

  /**

   * Create a input stream that returns tweets received from Twitter.

   * @param ssc         StreamingContext object

   * @param twitterAuth Twitter4J authentication, or None to use Twitter4J's
default OAuth

   *        authorization; this uses the system properties
twitter4j.oauth.consumerKey,

   *        twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and

   *        twitter4j.oauth.accessTokenSecret

   * @param filters Set of filter strings to get only those tweets that
match them

   * @param storageLevel Storage level to use for storing the received
objects

   */

  def createStream(

      ssc: StreamingContext,

      twitterAuth: Option[Authorization],

      filters: Seq[String] = Nil, // ??? Can we pass a FilterQuery object
instead

      storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2

    ): ReceiverInputDStream[Status] = {

    new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel)

  }





private[streaming]

class TwitterReceiver(

    twitterAuth: Authorization,

    filters: Seq[String],

    storageLevel: StorageLevel

  ) extends Receiver[Status](storageLevel) with Logging {



  @volatile private var twitterStream: TwitterStream = _

  @volatile private var stopped = false



  def onStart() {

    try {

      val newTwitterStream = new
TwitterStreamFactory().getInstance(twitterAuth)

      newTwitterStream.addListener(new StatusListener {

        def onStatus(status: Status): Unit = {

          store(status)

        }

        // Unimplemented

        def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {}

        def onTrackLimitationNotice(i: Int) {}

        def onScrubGeo(l: Long, l1: Long) {}

        def onStallWarning(stallWarning: StallWarning) {}

        def onException(e: Exception) {

          if (!stopped) {

            restart("Error receiving tweets", e)

          }

        }

      })



      val query = new FilterQuery // ??? Can we pass a FilterQuery object
instead

      if (filters.size > 0) {

        query.track(filters.toArray)

        newTwitterStream.filter(query)

      } else {

        newTwitterStream.sample()

      }

      setTwitterStream(newTwitterStream)

      logInfo("Twitter receiver started")

      stopped = false

    } catch {

      case e: Exception => restart("Error starting Twitter stream", e)

    }

  }






Reply via email to