Using flatmap on a string will treat it as a sequence, which is why you are getting an RDD of char. I think you want to just do a map instead. Like this
val timestamps = stream.map(event => event.getCreatedAt.toString) On Feb 25, 2016 8:27 AM, "Dominik Safaric" <dominiksafa...@gmail.com> wrote: > Recently, I've implemented the following Receiver and custom Spark > Streaming > InputDStream using Scala: > > /** > * The GitHubUtils object declares an interface consisting of overloaded > createStream > * functions. The createStream function takes as arguments the ctx : > StreamingContext > * passed by the driver program, along with the storageLevel : > StorageLevel, > returning > * a GitHubInputDStream. Whereas the GitHubInputDStream is a DStream > representation, > * i.e. a derivation of the abstract class ReceivedInputDStream. > */ > > object GitHubUtils{ > > def createStream(ctx : StreamingContext, storageLevel: StorageLevel) : > ReceiverInputDStream[Event] = new GitHubInputDStream(ctx,storageLevel) > > } > > /** > * The GitHubInputDStream class takes as constructor arguments a ctx : > StreamingContext, > * and a storageLevel : StorageLevel. The class inherits from the > ReceiverInputDStream > * abstract class declared within SparkStreaming. In summary, the > GitHubInputDStream > * is a DStream representation of GitHub events, implementing i.e. > overriding the > * getReceiver() function that returns a Receiver[Event] object. > */ > > private[streaming] > class GitHubInputDStream(ctx : StreamingContext, storageLevel: > StorageLevel) > extends ReceiverInputDStream[Event](ctx) { > > def getReceiver() : Receiver[Event] = new GitHubReceiver(storageLevel, > Client) > > } > > /** > * The GitHubReceiver class takes as a constructor argument a storageLevel > : > StorageLevel. > * It implements i.e. overrides two functions declared by the Receiver > interface, notably > * onStart() and onStop(). As the names imply, the onStart() function is > executed > * when creating DStreams, i.e. within a specified batch interval. However, > the onStart(). > */ > > private[streaming] > class GitHubReceiver(storageLevel: StorageLevel, client : GitHubClient) > extends Receiver[Event](storageLevel) with Logging { > > def onStart(): Unit = { > consumeEvents(new EventService(client).pagePublicEvents(0,300)) > } > > def consumeEvents(iterator: PageIterator[Event]) :Unit = iterator.hasNext > match{ > case true => iterator.next.toList.foreach{event => store(event)}; > consumeEvents(iterator) > case false => logInfo("Processing is stopping") > } > > def onStop(): Unit = { > > } > > However, then initialised i.e. created in the driver program on my local > machine and applied a series of functions like e.g. flatMap on a > DStream[Event]: > > val configuration = new > SparkConf().setAppName("StreamingSoftwareAnalytics").setMaster("local[2]") > val streamingContext = new StreamingContext(configuration, Seconds(5)) > > val stream = GitHubUtils.createStream(streamingContext, > StorageLevel.MEMORY_AND_DISK_SER) > > val timestamps = stream.flatMap(event => event.getCreatedAt.toString) > > and then applied a series of functions such as reduceByKey that would allow > me to count e.g. the number of events per second, I get the following > output: > > (T,100) > (d,100) > (4,100) > (8,13) > (6,114) > (0,366) > > While the output should be in the form of e.g.: > > (2016-26-02 00:00:01,100) > (2016-26-02 00:00:02,100) > (2016-26-02 00:00:03,100) > (2016-26-02 00:00:04,13) > (2016-26-02 00:00:05,114) > (2016-26-02 00:00:06,366) > > where K = Char. The root of the problem is that when flatMap is applied to > an event that is a serialisable object containing a member variable > getCreatedAt : Date, rather then producing a DStream[String] it produces a > DStream[Char] - meaning that Spark somehow splits the date String using > some > delimiter. > > I've also tried to collect and perform the computation on timestamps using > first foreachRDD on the DStream of events, and then using collect to get > the > full String representation of the date - and then it works. However, since > collect can be quite expensive, I am simply trying to avoid it and hence > think that there must be a better solution to this. > > Therefore, my questions are: how exactly do a create from the > DStream[Event] > a DStream[String] (instead of DStream[Char]), where each string in the > DStream represents a timestamp from a RDD? Secondly, can someone give some > good examples of this? And thirdly, which functions is at best to use if I > would like to e.g. aggregate all events per repository ID. I.e. each Event > object contains a getRepository() function that returns the ID : Long of > the > GitHub repository, and then on each streamed event belonging to a certain > repository, I would like to map it to its corresponding repository ID in > the > form of (Long, [Event]). > > Thanks in advance! > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-processing-transforming-DStreams-using-a-custom-Receiver-tp26336.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >