No, in general you can't make new RDDs in code running on the executors. It looks like your properties file is a constant, why not process it at the beginning of the job and broadcast the result?
On Fri, Sep 11, 2015 at 2:09 PM, Rachana Srivastava < rachana.srivast...@markmonitor.com> wrote: > Hello all, > > > > Can we invoke JavaRDD while processing stream from Kafka for example. > Following code is throwing some serialization exception. Not sure if this > is feasible. > > > > JavaStreamingContext jssc = *new* JavaStreamingContext(jsc, Durations. > *seconds*(5)); > > JavaPairReceiverInputDStream<String, String> messages = KafkaUtils. > *createStream*(jssc, zkQuorum, group, topicMap); > > JavaDStream<String> lines = messages.map(*new* *Function<Tuple2<String, > String>, String>()* { > > *public* String call(Tuple2<String, String> tuple2) { *return* > tuple2._2(); > > } > > }); > > JavaPairDStream<String, String> wordCounts = lines.mapToPair( *new* > *PairFunction<String, > String, String>()* { > > *public* Tuple2<String, String> call(String urlString) { > > String propertiesFile = > "/home/cloudera/Desktop/sample/input/featurelist.properties"; > > JavaRDD<String> propertiesFileRDD = jsc.textFile( > propertiesFile); > > JavaPairRDD<String, String> featureKeyClassPair > = propertiesFileRDD.mapToPair( > > *new* *PairFunction<String, String, > String>()* { > > *public* Tuple2<String, > String> call(String property) { > > *return* *new** > Tuple2(**property**.split(**"="**)[0], **property**.split(**"="**)[1])*; > > } > > }); > > featureKeyClassPair.count(); > > *return* *new* Tuple2<String, String>(urlString, featureScore); > > } > > }); > > >