Recently, I've implemented the following Receiver and custom Spark Streaming InputDStream using Scala:
/** * The GitHubUtils object declares an interface consisting of overloaded createStream * functions. The createStream function takes as arguments the ctx : StreamingContext * passed by the driver program, along with the storageLevel : StorageLevel, returning * a GitHubInputDStream. Whereas the GitHubInputDStream is a DStream representation, * i.e. a derivation of the abstract class ReceivedInputDStream. */ object GitHubUtils{ def createStream(ctx : StreamingContext, storageLevel: StorageLevel) : ReceiverInputDStream[Event] = new GitHubInputDStream(ctx,storageLevel) } /** * The GitHubInputDStream class takes as constructor arguments a ctx : StreamingContext, * and a storageLevel : StorageLevel. The class inherits from the ReceiverInputDStream * abstract class declared within SparkStreaming. In summary, the GitHubInputDStream * is a DStream representation of GitHub events, implementing i.e. overriding the * getReceiver() function that returns a Receiver[Event] object. */ private[streaming] class GitHubInputDStream(ctx : StreamingContext, storageLevel: StorageLevel) extends ReceiverInputDStream[Event](ctx) { def getReceiver() : Receiver[Event] = new GitHubReceiver(storageLevel, Client) } /** * The GitHubReceiver class takes as a constructor argument a storageLevel : StorageLevel. * It implements i.e. overrides two functions declared by the Receiver interface, notably * onStart() and onStop(). As the names imply, the onStart() function is executed * when creating DStreams, i.e. within a specified batch interval. However, the onStart(). */ private[streaming] class GitHubReceiver(storageLevel: StorageLevel, client : GitHubClient) extends Receiver[Event](storageLevel) with Logging { def onStart(): Unit = { consumeEvents(new EventService(client).pagePublicEvents(0,300)) } def consumeEvents(iterator: PageIterator[Event]) :Unit = iterator.hasNext match{ case true => iterator.next.toList.foreach{event => store(event)}; consumeEvents(iterator) case false => logInfo("Processing is stopping") } def onStop(): Unit = { } However, then initialised i.e. created in the driver program on my local machine and applied a series of functions like e.g. flatMap on a DStream[Event]: val configuration = new SparkConf().setAppName("StreamingSoftwareAnalytics").setMaster("local[2]") val streamingContext = new StreamingContext(configuration, Seconds(5)) val stream = GitHubUtils.createStream(streamingContext, StorageLevel.MEMORY_AND_DISK_SER) val timestamps = stream.flatMap(event => event.getCreatedAt.toString) and then applied a series of functions such as reduceByKey that would allow me to count e.g. the number of events per second, I get the following output: (T,100) (d,100) (4,100) (8,13) (6,114) (0,366) While the output should be in the form of e.g.: (2016-26-02 00:00:01,100) (2016-26-02 00:00:02,100) (2016-26-02 00:00:03,100) (2016-26-02 00:00:04,13) (2016-26-02 00:00:05,114) (2016-26-02 00:00:06,366) where K = Char. The root of the problem is that when flatMap is applied to an event that is a serialisable object containing a member variable getCreatedAt : Date, rather then producing a DStream[String] it produces a DStream[Char] - meaning that Spark somehow splits the date String using some delimiter. I've also tried to collect and perform the computation on timestamps using first foreachRDD on the DStream of events, and then using collect to get the full String representation of the date - and then it works. However, since collect can be quite expensive, I am simply trying to avoid it and hence think that there must be a better solution to this. Therefore, my questions are: how exactly do a create from the DStream[Event] a DStream[String] (instead of DStream[Char]), where each string in the DStream represents a timestamp from a RDD? Secondly, can someone give some good examples of this? And thirdly, which functions is at best to use if I would like to e.g. aggregate all events per repository ID. I.e. each Event object contains a getRepository() function that returns the ID : Long of the GitHub repository, and then on each streamed event belonging to a certain repository, I would like to map it to its corresponding repository ID in the form of (Long, [Event]). Thanks in advance! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-processing-transforming-DStreams-using-a-custom-Receiver-tp26336.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org