Hi Team,
I am new to spark Streaming , I am trying to write a spark Streaming
application , where the Calculation of incoming data will be performed in "R"
in the micro batching .
But I want to make wordCounts.mapToPair parallel where wordCounts is the output
of groupByKey, How can I ensure that, wordCounts.mapToPair will be all parallel
, so that RUtilMethods.sum(inputToR)) will be invoked parallel.
How to ensure the above parallelism ?????
Note: I can not use reduceByKey or combineByKey as calling R multiple time
would be significant overhead .
Thanks!!!!
////////////////Code Sample////////////////////
public final class JavaNetworkWordCount {
private static final Pattern SPACE = Pattern.compile(" ");
public static void main(String[] args) {
// Create the context with a 1 second batch size
SparkConf sparkConf = new SparkConf().setAppName("JavaNetworkWordCount");
sparkConf.setMaster("local[4]");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
Durations.seconds(10));
// Create a JavaReceiverInputDStream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
// Note that no duplication in storage level only for running locally.
// Replication necessary in distributed scenario for fault tolerance.
JavaReceiverInputDStream<String> lines = ssc.socketTextStream("localhost",
10000,
StorageLevels.MEMORY_AND_DISK_SER);
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String,
String>() {
@Override
public Iterable<String> call(String x) {
return Lists.newArrayList(SPACE.split(x));
}
});
JavaPairDStream<String, Iterable<Integer>> wordCounts = words.mapToPair(new
PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
}).groupByKeyAndWindow(Durations.seconds(60));
JavaPairDStream<String, Integer> wordCounts1=wordCounts.mapToPair(new
PairFunction<Tuple2<String,Iterable<Integer>>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<String, Iterable<Integer>>
data) throws Exception {
// TODO Auto-generated method stub
List<Integer> it=IteratorUtils.toList(data._2.iterator());
int[] inputToR = ArrayUtils.toPrimitive(it.toArray(new Integer[0]));
it = null;
Runtime.getRuntime().gc();
return new Tuple2<String, Integer>(data._1,
RUtilMethods.sum(inputToR));
}
});
wordCounts1.print();
ssc.start();
ssc.awaitTermination();
}}
///////////////////////////////////////////////////////////