Using flatmap on a string will treat it as a sequence, which is why you are
getting an RDD of char.  I think you want to just do a map instead.  Like
this

val timestamps = stream.map(event => event.getCreatedAt.toString)
On Feb 25, 2016 8:27 AM, "Dominik Safaric" <dominiksafa...@gmail.com> wrote:

> Recently, I've implemented the following Receiver and custom Spark
> Streaming
> InputDStream using Scala:
>
> /**
>  * The GitHubUtils object declares an interface consisting of overloaded
> createStream
>  * functions. The createStream function takes as arguments the ctx :
> StreamingContext
>  * passed by the driver program, along with the storageLevel :
> StorageLevel,
> returning
>  * a GitHubInputDStream. Whereas the GitHubInputDStream is a DStream
> representation,
>  * i.e. a derivation of the abstract class ReceivedInputDStream.
> */
>
> object GitHubUtils{
>
>   def createStream(ctx : StreamingContext, storageLevel: StorageLevel) :
> ReceiverInputDStream[Event] = new GitHubInputDStream(ctx,storageLevel)
>
> }
>
> /**
>  * The GitHubInputDStream class takes as constructor arguments a ctx :
> StreamingContext,
>  * and a storageLevel : StorageLevel. The class inherits from the
> ReceiverInputDStream
>  * abstract class declared within SparkStreaming. In summary, the
> GitHubInputDStream
>  * is a DStream representation of GitHub events, implementing i.e.
> overriding the
>  * getReceiver() function that returns a Receiver[Event] object.
> */
>
> private[streaming]
> class GitHubInputDStream(ctx : StreamingContext, storageLevel:
> StorageLevel)
> extends ReceiverInputDStream[Event](ctx) {
>
>  def getReceiver() : Receiver[Event] = new GitHubReceiver(storageLevel,
> Client)
>
> }
>
> /**
>  * The GitHubReceiver class takes as a constructor argument a storageLevel
> :
> StorageLevel.
>  * It implements i.e. overrides two functions declared by the Receiver
> interface, notably
>  * onStart() and onStop(). As the names imply, the onStart() function is
> executed
>  * when creating DStreams, i.e. within a specified batch interval. However,
> the onStart().
> */
>
> private[streaming]
> class GitHubReceiver(storageLevel: StorageLevel, client : GitHubClient)
> extends Receiver[Event](storageLevel) with Logging {
>
>   def onStart(): Unit = {
>     consumeEvents(new EventService(client).pagePublicEvents(0,300))
> }
>
>  def consumeEvents(iterator: PageIterator[Event]) :Unit = iterator.hasNext
> match{
>     case true => iterator.next.toList.foreach{event => store(event)};
> consumeEvents(iterator)
>     case false => logInfo("Processing is stopping")
> }
>
> def onStop(): Unit = {
>
> }
>
> However, then initialised i.e. created in the driver program on my local
> machine and applied a series of functions like e.g. flatMap on a
> DStream[Event]:
>
> val configuration = new
> SparkConf().setAppName("StreamingSoftwareAnalytics").setMaster("local[2]")
> val streamingContext = new StreamingContext(configuration, Seconds(5))
>
> val stream = GitHubUtils.createStream(streamingContext,
> StorageLevel.MEMORY_AND_DISK_SER)
>
> val timestamps = stream.flatMap(event => event.getCreatedAt.toString)
>
> and then applied a series of functions such as reduceByKey that would allow
> me to count e.g. the number of events per second, I get the following
> output:
>
> (T,100)
> (d,100)
> (4,100)
> (8,13)
> (6,114)
> (0,366)
>
> While the output should be in the form of e.g.:
>
> (2016-26-02 00:00:01,100)
> (2016-26-02 00:00:02,100)
> (2016-26-02 00:00:03,100)
> (2016-26-02 00:00:04,13)
> (2016-26-02 00:00:05,114)
> (2016-26-02 00:00:06,366)
>
> where K = Char. The root of the problem is that when flatMap is applied to
> an event that is a serialisable object containing a member variable
> getCreatedAt : Date, rather then producing a DStream[String] it produces a
> DStream[Char] - meaning that Spark somehow splits the date String using
> some
> delimiter.
>
> I've also tried to collect and perform the computation on timestamps using
> first foreachRDD on the DStream of events, and then using collect to get
> the
> full String representation of the date - and then it works. However, since
> collect can be quite expensive, I am simply trying to avoid it and hence
> think that there must be a better solution to this.
>
> Therefore, my questions are: how exactly do a create from the
> DStream[Event]
> a DStream[String] (instead of DStream[Char]), where each string in the
> DStream represents a timestamp from a RDD? Secondly, can someone give some
> good examples of this? And thirdly, which functions is at best to use if I
> would like to e.g. aggregate all events per repository ID. I.e. each Event
> object contains a getRepository() function that returns the ID : Long of
> the
> GitHub repository, and then on each streamed event belonging to a certain
> repository, I would like to map it to its corresponding repository ID in
> the
> form of (Long, [Event]).
>
> Thanks in advance!
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-processing-transforming-DStreams-using-a-custom-Receiver-tp26336.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

Reply via email to