Dear all,

Lately, as a part of a scientific research, I've been developing an
application that streams (or at least should) data from Travis CI and
GitHub, using their REST API's. The purpose of this is to get insight into
the commit-build relationship, in order to further perform numerous
analysis. 

For this, I've implemented the following Travis custom receiver:


object TravisUtils {

  def createStream(ctx : StreamingContext, storageLevel: StorageLevel) :
ReceiverInputDStream[Build] = new TravisInputDStream(ctx, storageLevel)

}

private[streaming]
class TravisInputDStream(ctx : StreamingContext, storageLevel :
StorageLevel) extends ReceiverInputDStream[Build](ctx) {

  def getReceiver() : Receiver[Build] = new TravisReceiver(storageLevel)

}

private[streaming]
class TravisReceiver(storageLevel: StorageLevel) extends
Receiver[Build](storageLevel) with Logging {

  def onStart() : Unit = {
    new BuildStream().addListener(new BuildListener {
      
      override def onBuildsReceived(numberOfBuilds: Int): Unit = {
        
      }

      override def onBuildRepositoryReceived(build: Build): Unit = {
        store(build)
      }

      override def onException(e: Exception): Unit = {
        reportError("Exception while streaming travis", e)
      }
    })

  }

  def onStop() : Unit = {

  }
}

Whereas the receiver uses my custom made TRAVIS API library (developed in
Java using Apache Async Client). However, the problem is the following: the
data that I should be receiving is continuous and changes i.e. is being
pushed to Travis and GitHub constantly. As an example, consider the fact
that GitHub records per second approx. 350 events - including push events,
commit comment and similar.

But, when streaming either GitHub or Travis, I do get the data from the
first two batches, but then afterwards, the RDD's apart of the DStream are
empty - although there is data to be streamed! 

I've checked so far couple of things, including the HttpClient used for
omitting requests to the API, but none of them did actually solve this
problem.

Therefore, my question is - what could be going on? Why isn't Spark
streaming the data after period x passes. Below, you may find the set
context and configuration:

val configuration = new
SparkConf().setAppName("StreamingSoftwareAnalytics").setMaster("local[2]")

    val ctx = new StreamingContext(configuration, Seconds(3))

    val stream = GitHubUtils.createStream(ctx,
StorageLevel.MEMORY_AND_DISK_SER)

    // RDD IS EMPTY - that is what is happenning!
    stream.window(Seconds(9)).foreachRDD(rdd => {
      if (rdd.isEmpty()) {println("RDD IS EMPTY")} else
{rdd.collect().foreach(event => println(event.getRepo.getName + " " +
event.getId))}
    })

    ctx.start()
    ctx.awaitTermination()

Thanks in advance!   



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Travis-CI-and-GitHub-custom-receiver-continuous-data-but-empty-RDD-tp26406.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to