Recently, I've implemented the following Receiver and custom Spark Streaming
InputDStream using Scala:

/**
 * The GitHubUtils object declares an interface consisting of overloaded
createStream
 * functions. The createStream function takes as arguments the ctx :
StreamingContext
 * passed by the driver program, along with the storageLevel : StorageLevel,
returning
 * a GitHubInputDStream. Whereas the GitHubInputDStream is a DStream
representation,
 * i.e. a derivation of the abstract class ReceivedInputDStream.
*/

object GitHubUtils{

  def createStream(ctx : StreamingContext, storageLevel: StorageLevel) :
ReceiverInputDStream[Event] = new GitHubInputDStream(ctx,storageLevel)

}

/**
 * The GitHubInputDStream class takes as constructor arguments a ctx :
StreamingContext,
 * and a storageLevel : StorageLevel. The class inherits from the
ReceiverInputDStream
 * abstract class declared within SparkStreaming. In summary, the
GitHubInputDStream
 * is a DStream representation of GitHub events, implementing i.e.
overriding the
 * getReceiver() function that returns a Receiver[Event] object.
*/

private[streaming]
class GitHubInputDStream(ctx : StreamingContext, storageLevel: StorageLevel)
extends ReceiverInputDStream[Event](ctx) {

 def getReceiver() : Receiver[Event] = new GitHubReceiver(storageLevel,
Client)

}

/**
 * The GitHubReceiver class takes as a constructor argument a storageLevel :
StorageLevel.
 * It implements i.e. overrides two functions declared by the Receiver
interface, notably
 * onStart() and onStop(). As the names imply, the onStart() function is
executed
 * when creating DStreams, i.e. within a specified batch interval. However,
the onStart().
*/

private[streaming]
class GitHubReceiver(storageLevel: StorageLevel, client : GitHubClient)
extends Receiver[Event](storageLevel) with Logging {

  def onStart(): Unit = {
    consumeEvents(new EventService(client).pagePublicEvents(0,300))
}

 def consumeEvents(iterator: PageIterator[Event]) :Unit = iterator.hasNext
match{
    case true => iterator.next.toList.foreach{event => store(event)};
consumeEvents(iterator)
    case false => logInfo("Processing is stopping")
}

def onStop(): Unit = {

}

However, then initialised i.e. created in the driver program on my local
machine and applied a series of functions like e.g. flatMap on a
DStream[Event]:

val configuration = new
SparkConf().setAppName("StreamingSoftwareAnalytics").setMaster("local[2]")
val streamingContext = new StreamingContext(configuration, Seconds(5))

val stream = GitHubUtils.createStream(streamingContext,
StorageLevel.MEMORY_AND_DISK_SER)

val timestamps = stream.flatMap(event => event.getCreatedAt.toString)

and then applied a series of functions such as reduceByKey that would allow
me to count e.g. the number of events per second, I get the following
output:

(T,100)
(d,100)
(4,100)
(8,13)
(6,114)
(0,366)

While the output should be in the form of e.g.:

(2016-26-02 00:00:01,100)
(2016-26-02 00:00:02,100)
(2016-26-02 00:00:03,100)
(2016-26-02 00:00:04,13)
(2016-26-02 00:00:05,114)
(2016-26-02 00:00:06,366)

where K = Char. The root of the problem is that when flatMap is applied to
an event that is a serialisable object containing a member variable
getCreatedAt : Date, rather then producing a DStream[String] it produces a
DStream[Char] - meaning that Spark somehow splits the date String using some
delimiter.

I've also tried to collect and perform the computation on timestamps using
first foreachRDD on the DStream of events, and then using collect to get the
full String representation of the date - and then it works. However, since
collect can be quite expensive, I am simply trying to avoid it and hence
think that there must be a better solution to this.

Therefore, my questions are: how exactly do a create from the DStream[Event]
a DStream[String] (instead of DStream[Char]), where each string in the
DStream represents a timestamp from a RDD? Secondly, can someone give some
good examples of this? And thirdly, which functions is at best to use if I
would like to e.g. aggregate all events per repository ID. I.e. each Event
object contains a getRepository() function that returns the ID : Long of the
GitHub repository, and then on each streamed event belonging to a certain
repository, I would like to map it to its corresponding repository ID in the
form of (Long, [Event]).

Thanks in advance!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-processing-transforming-DStreams-using-a-custom-Receiver-tp26336.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to