Hi,
I applied it as fallows:
eegStreams(a) = KafkaUtils.createStream(ssc, zkQuorum, group, Map(args(a) ->
1),StorageLevel.MEMORY_AND_DISK_SER).map(_._2)val counts = eegStreams(a).map(x
=> math.round(x.toDouble)).countByValueAndWindow(Seconds(4), Seconds(4))
val sortedCounts = counts.map(_.swap).transform(rdd =>
rdd.sortByKey(false)).map(_.swap)val topCounts =
sortedCounts.mapPartitions(rdd=>rdd.take(10))
//val topCounts = sortedCounts.transform(rdd =>
ssc.sparkContext.makeRDD(rdd.take(10)))topCounts.map(tuple =>
"%s,%s".format(tuple._1,
tuple._2)).saveAsTextFiles("hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/"
+ (a+1)) topCounts.print()
It gives the output with 10 extra values. I think it works on partition of each
rdd rather than just rdd. I also tried the commented code. It gives correct
result but in the start it gives serialisation error
ERROR actor.OneForOneStrategy:
org.apache.spark.streaming.StreamingContextjava.io.NotSerializableException:
org.apache.spark.streaming.StreamingContext
Output for code in red: The values in green looks extra to me.
0,578-3,5764,559-1,5563,553-6,5406,538-4,5351,52610,48394,8-113,8-137,8-85,8-91,8-121,8114,8108,893,8101,81,128-8,1183,112-4,110-13,1084,108-3,107-10,107-6,1068,10576,674,660,652,670,671,6-60,655,678,564,5
and so on.
Regards,Laeeq
On Tuesday, January 6, 2015 9:06 AM, Akhil Das
<[email protected]> wrote:
You can try something like:
val top10 = your_stream.mapPartitions(rdd => rdd.take(10))
ThanksBest Regards
On Mon, Jan 5, 2015 at 11:08 PM, Laeeq Ahmed <[email protected]>
wrote:
Hi,
I am counting values in each window and find the top values and want to save
only the top 10 frequent values of each window to hdfs rather than all the
values.
eegStreams(a) = KafkaUtils.createStream(ssc, zkQuorum, group, Map(args(a) ->
1),StorageLevel.MEMORY_AND_DISK_SER).map(_._2)val counts = eegStreams(a).map(x
=> (math.round(x.toDouble), 1)).reduceByKeyAndWindow(_ + _, _ - _, Seconds(4),
Seconds(4))val sortedCounts = counts.map(_.swap).transform(rdd =>
rdd.sortByKey(false)).map(_.swap)//sortedCounts.foreachRDD(rdd =>println("\nTop
10 amplitudes:\n" + rdd.take(10).mkString("\n")))sortedCounts.map(tuple =>
"%s,%s".format(tuple._1,
tuple._2)).saveAsTextFiles("hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/"
+ (a+1))
I can print top 10 as above in red.
I have also tried
sortedCounts.foreachRDD{ rdd =>
ssc.sparkContext.parallelize(rdd.take(10)).saveAsTextFile("hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/"
+ (a+1))}
but I get the following error.
15/01/05 17:12:23 ERROR actor.OneForOneStrategy:
org.apache.spark.streaming.StreamingContextjava.io.NotSerializableException:
org.apache.spark.streaming.StreamingContext
Regards,Laeeq