Hi,
I am counting values in each window and find the top values and want to save
only the top 10 frequent values of each window to hdfs rather than all the
values.
eegStreams(a) = KafkaUtils.createStream(ssc, zkQuorum, group, Map(args(a) ->
1),StorageLevel.MEMORY_AND_DISK_SER).map(_._2)val counts = eegStreams(a).map(x
=> (math.round(x.toDouble), 1)).reduceByKeyAndWindow(_ + _, _ - _, Seconds(4),
Seconds(4))val sortedCounts = counts.map(_.swap).transform(rdd =>
rdd.sortByKey(false)).map(_.swap)//sortedCounts.foreachRDD(rdd =>println("\nTop
10 amplitudes:\n" + rdd.take(10).mkString("\n")))sortedCounts.map(tuple =>
"%s,%s".format(tuple._1,
tuple._2)).saveAsTextFiles("hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/"
+ (a+1))
I can print top 10 as above in red.
I have also tried
sortedCounts.foreachRDD{ rdd =>
ssc.sparkContext.parallelize(rdd.take(10)).saveAsTextFile("hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/"
+ (a+1))}
but I get the following error.
15/01/05 17:12:23 ERROR actor.OneForOneStrategy:
org.apache.spark.streaming.StreamingContextjava.io.NotSerializableException:
org.apache.spark.streaming.StreamingContext
Regards,Laeeq