Dear all, Lately, as a part of a scientific research, I've been developing an application that streams (or at least should) data from Travis CI and GitHub, using their REST API's. The purpose of this is to get insight into the commit-build relationship, in order to further perform numerous analysis.
For this, I've implemented the following Travis custom receiver: object TravisUtils { def createStream(ctx : StreamingContext, storageLevel: StorageLevel) : ReceiverInputDStream[Build] = new TravisInputDStream(ctx, storageLevel) } private[streaming] class TravisInputDStream(ctx : StreamingContext, storageLevel : StorageLevel) extends ReceiverInputDStream[Build](ctx) { def getReceiver() : Receiver[Build] = new TravisReceiver(storageLevel) } private[streaming] class TravisReceiver(storageLevel: StorageLevel) extends Receiver[Build](storageLevel) with Logging { def onStart() : Unit = { new BuildStream().addListener(new BuildListener { override def onBuildsReceived(numberOfBuilds: Int): Unit = { } override def onBuildRepositoryReceived(build: Build): Unit = { store(build) } override def onException(e: Exception): Unit = { reportError("Exception while streaming travis", e) } }) } def onStop() : Unit = { } } Whereas the receiver uses my custom made TRAVIS API library (developed in Java using Apache Async Client). However, the problem is the following: the data that I should be receiving is continuous and changes i.e. is being pushed to Travis and GitHub constantly. As an example, consider the fact that GitHub records per second approx. 350 events - including push events, commit comment and similar. But, when streaming either GitHub or Travis, I do get the data from the first two batches, but then afterwards, the RDD's apart of the DStream are empty - although there is data to be streamed! I've checked so far couple of things, including the HttpClient used for omitting requests to the API, but none of them did actually solve this problem. Therefore, my question is - what could be going on? Why isn't Spark streaming the data after period x passes. Below, you may find the set context and configuration: val configuration = new SparkConf().setAppName("StreamingSoftwareAnalytics").setMaster("local[2]") val ctx = new StreamingContext(configuration, Seconds(3)) val stream = GitHubUtils.createStream(ctx, StorageLevel.MEMORY_AND_DISK_SER) // RDD IS EMPTY - that is what is happenning! stream.window(Seconds(9)).foreachRDD(rdd => { if (rdd.isEmpty()) {println("RDD IS EMPTY")} else {rdd.collect().foreach(event => println(event.getRepo.getName + " " + event.getId))} }) ctx.start() ctx.awaitTermination() Thanks in advance! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Travis-CI-and-GitHub-custom-receiver-continuous-data-but-empty-RDD-tp26406.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org