Hi
Can you try this
val lines = FlumeUtils.createStream(ssc,"localhost",9999)
// Print out the count of events received from this server in each
batch
lines.count().map(cnt => "Received " + cnt + " flume events. at " +
System.currentTimeMillis() )
lines.forechRDD(_.foreach(println))
Thanks
Arush
On Tue, Feb 17, 2015 at 11:17 AM, [email protected] <[email protected]> wrote:
> Hi,
> I am trying Spark Streaming + Flume example:
>
> 1. Code
> object SparkFlumeNGExample {
> def main(args : Array[String]) {
> val conf = new SparkConf().setAppName("SparkFlumeNGExample")
> val ssc = new StreamingContext(conf, Seconds(10))
>
> val lines = FlumeUtils.createStream(ssc,"localhost",9999)
> // Print out the count of events received from this server in each
> batch
> lines.count().map(cnt => "Received " + cnt + " flume events. at " +
> System.currentTimeMillis() ).print()
> ssc.start()
> ssc.awaitTermination();
> }
> }
> 2. I submit the application with following sh:
> ./spark-submit --deploy-mode client --name SparkFlumeEventCount --master
> spark://hadoop.master:7077 --executor-memory 512M --total-executor-cores 2
> --class spark.examples.streaming.SparkFlumeNGWordCount
> spark-streaming-flume.jar
>
>
> When I write data to flume, I only notice the following console
> information that input is added.
> storage.BlockManagerInfo: Added input-0-1424151807400 in memory on
> localhost:39338 (size: 1095.0 B, free: 267.2 MB)
> 15/02/17 00:43:30 INFO scheduler.JobScheduler: Added jobs for time
> 1424151810000 ms
> 15/02/17 00:43:40 INFO scheduler.JobScheduler: Added jobs for time
> 1424151820000 ms
> ....
> 15/02/17 00:43:40 INFO scheduler.JobScheduler: Added jobs for time
> 1424151870000 ms
>
> But I didn't the output from the code: "Received X flumes events"
>
> I am no idea where the problem is, any idea? Thanks
>
>
> ------------------------------
>
>
--
[image: Sigmoid Analytics] <http://htmlsig.com/www.sigmoidanalytics.com>
*Arush Kharbanda* || Technical Teamlead
[email protected] || www.sigmoidanalytics.com