Hi,
I am trying Spark Streaming + Flume example:
1. Code
object SparkFlumeNGExample {
def main(args : Array[String]) {
val conf = new SparkConf().setAppName("SparkFlumeNGExample")
val ssc = new StreamingContext(conf, Seconds(10))
val lines = FlumeUtils.createStream(ssc,"localhost",9999)
// Print out the count of events received from this server in each batch
lines.count().map(cnt => "Received " + cnt + " flume events. at " +
System.currentTimeMillis() ).print()
ssc.start()
ssc.awaitTermination();
}
}
2. I submit the application with following sh:
./spark-submit --deploy-mode client --name SparkFlumeEventCount --master
spark://hadoop.master:7077 --executor-memory 512M --total-executor-cores 2
--class spark.examples.streaming.SparkFlumeNGWordCount spark-streaming-flume.jar
When I write data to flume, I only notice the following console information
that input is added.
storage.BlockManagerInfo: Added input-0-1424151807400 in memory on
localhost:39338 (size: 1095.0 B, free: 267.2 MB)
15/02/17 00:43:30 INFO scheduler.JobScheduler: Added jobs for time
1424151810000 ms
15/02/17 00:43:40 INFO scheduler.JobScheduler: Added jobs for time
1424151820000 ms
....
15/02/17 00:43:40 INFO scheduler.JobScheduler: Added jobs for time
1424151870000 ms
But I didn't the output from the code: "Received X flumes events"
I am no idea where the problem is, any idea? Thanks