Since mapToPair will be called on each record, and the # records can be
tens or millions, you probably do not want to run ALL of them in parallel.
So think about your strategy here.

In general the parallelism can be controlled by setting the number of
partitions in the groupByKey operation.

On Mon, Dec 14, 2015 at 10:44 PM, Rabin Banerjee <
rabin.baner...@ericsson.com> wrote:

> Hi Team,
>
> I am new to spark Streaming , I am trying to write a spark Streaming
> application , where the Calculation of incoming data will be performed in
> "R" in the micro batching .
>
> *But I want to make wordCounts.mapToPair parallel where wordCounts is the
> output of groupByKey, How can I ensure that, wordCounts.mapToPair will be
> all parallel , so that RUtilMethods.sum(inputToR)) will be invoked
> parallel.*
>
> How to ensure the above parallelism ?????
>
> Note: I can not use reduceByKey or combineByKey as calling R multiple time
> would be significant overhead .
>
> Thanks!!!!
>
>
>
> ////////////////Code Sample////////////////////
>
> public final class JavaNetworkWordCount {
>
> private static final Pattern SPACE = Pattern.compile(" ");
>
>
>
> public static void main(String[] args) {
>
>
>
>
>
>     // Create the context with a 1 second batch size
>
>     SparkConf sparkConf = new
> SparkConf().setAppName("JavaNetworkWordCount");
>
>     sparkConf.setMaster("local[4]");
>
>     JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
> Durations.seconds(10));
>
>     // Create a JavaReceiverInputDStream on target ip:port and count the
>
>     // words in input stream of \n delimited text (eg. generated by 'nc')
>
>     // Note that no duplication in storage level only for running locally.
>
>     // Replication necessary in distributed scenario for fault tolerance.
>
>     JavaReceiverInputDStream<String> lines =
> ssc.socketTextStream("localhost", 10000,
>
>             StorageLevels.MEMORY_AND_DISK_SER);
>
>
>
>     JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String,
> String>() {
>
>         @Override
>
>         public Iterable<String> call(String x) {
>
>             return Lists.newArrayList(SPACE.split(x));
>
>         }
>
>     });
>
>     JavaPairDStream<String, Iterable<Integer>> wordCounts =
> words.mapToPair(new PairFunction<String, String, Integer>() {
>
>         @Override
>
>         public Tuple2<String, Integer> call(String s) {
>
>             return new Tuple2<String, Integer>(s, 1);
>
>         }
>
>     }).groupByKeyAndWindow(Durations.seconds(60));
>
>     JavaPairDStream<String, Integer> wordCounts1=wordCounts.mapToPair(new
> PairFunction<Tuple2<String,Iterable<Integer>>, String, Integer>() {
>
>
>
>         @Override
>
>         public Tuple2<String, Integer> call(Tuple2<String,
> Iterable<Integer>> data) throws Exception {
>
>             // TODO Auto-generated method stub
>
>             List<Integer> it=IteratorUtils.toList(data._2.iterator());
>
>             int[] inputToR = ArrayUtils.toPrimitive(it.toArray(new
> Integer[0]));
>
>             it = null;
>
>             Runtime.getRuntime().gc();
>
>             return new Tuple2<String, Integer>(data._1,
> RUtilMethods.sum(inputToR));
>
>         }
>
>     });
>
>
>
>     wordCounts1.print();
>
>     ssc.start();
>
>     ssc.awaitTermination();
>
> }}
> ///////////////////////////////////////////////////////////
>
>
>

Reply via email to