hello..
i am on my second day with spark.. and im having trouble getting the foreach
function to work with the network wordcount example.. i can see the the
"flatMap" and "map" methods are being invoked.. but i dont seem to be getting
into the foreach method... not sure if what i am doing even makes sense.. any
help is appreciated... thx !!!
JavaDStream<String> lines = ssc.socketTextStream("localhost",
1234);
JavaDStream<String> words = lines
.flatMap(new FlatMapFunction<String, String>() {
@Override public Iterable<String>
call(String x) { //-- this is
being invoked return
Lists.newArrayList(x.split(" ")); }
});
JavaPairDStream<String, Integer> wordCounts = words
.map(new PairFunction<String, String, Integer>() {
@Override public
Tuple2<String, Integer> call(String s) throws Exception {
//-- this is being invoked
return new Tuple2<String, Integer>(s, 1);
} });
wordCounts.foreach(collectTuplesFunc);
ssc.start(); ssc.awaitTermination(); }
Function collectTuplesFunc = new Function<JavaPairRDD<Tuple2<byte[],
byte[]>, Void>, Void>() {
@Override public Void
call(JavaPairRDD<Tuple2<byte[], byte[]>, Void> arg0)
throws Exception { //-- this is NOT being invoked
return null; } };
i am assuming that in the foreach call is where i would write to an external
system.. please correct me if this assumption is wrong
thanks again