It is not reflection that is the issue here but use of an RDD transformation "featureKeyClassPair.map" inside "lines.mapToPair".
>From the code snippet you have sent it is not very clear if getFeatureScore(id,data) invokes executeFeedFeatures, but if that is the case it is not very obvious that “data” is a supposed to be huge and thus need to be PairRDD and if it is not you do not need to use the JavaPairRDD<String, String>, instead use a Map<String, String> and return a List<Double>. If it data is huge and has to be PairRDD pull out the logic to build the data PairRDD and then invoke map function on that RDD. - Ankur On Mon, Sep 14, 2015 at 12:43 PM, <rachana.srivast...@thomsonreuters.com> wrote: > Thanks so much Ajay and Ankur for your input. > > > > What we are trying to do is following: I am trying to invoke a class > using Java reflection to get the result > > > > *THIS WORKS FINE* > > public static void main(String[] args) throws Exception { > > final JavaSparkContext jsc = new JavaSparkContext(sparkConf); > > ... > > METHOD THAT I AM TRYING TO INVOKE USING REFLECTION > > JavaPairDStream<String, String> urlFeatureScore = lines.mapToPair( new > PairFunction<String, String, String>() { > > public Tuple2<String, String> call(final String urlString) throws > Exception { > > String featureScore = getFeatureScore(id,data); > > return new Tuple2<String, String>(urlString, featureScore); > > } > > }); > > ... > > *REPLACED WITH METHOD INVOKED USING REFLECTION DOES NOT WORK ERROR MESSAGE > BELOW.* > > > executeFeedFactories2(featureClassName, featureParam, featureData) > > jssc.start(); > > jssc.awaitTermination(); > > } > > > > *Splitting the same work to Class using Reflection does not work:* > > > > private static JavaRDD<Double> executeFeedFactories2(String > featureClassName, Map<String, String> featureParam,JavaPairRDD<String, > String> featureData) throws Exception { > > Class featureClass = Class.forName(MyClass); > > Method m = featureClass.getMethod("executeFeedFeatures", > Map.class, JavaPairRDD.class); > > JavaRDD<Double> score = ( JavaRDD<Double> ) > m.invoke(featureClass.newInstance(), featureParam,featureData); > > return score; > > } > > > > public class MyClass{ > > public static JavaRDD<Double> executeFeedFeatures(*Map* > featureParamMap,JavaPairRDD<String, > String> featureKeyClassPair ){ > > featureScoreRDD = featureKeyClassPair.map(new Function<Tuple2<String, > String>, Double>() { > > public Double call(Tuple2<String, String> keyValue) { > > … > > } > > }); > > return featureScoreRDD; > > } > > > > } > > > > Thanks again for all your help and advice. > > > > Regards, > > > > Rachana > > > > *From:* Ajay Singal [mailto:asinga...@gmail.com] > *Sent:* Monday, September 14, 2015 12:20 PM > *To:* Rachana Srivastava; Ankur Srivastava > *Cc:* u...@spark.apache.org; dev@spark.apache.org; Ajay Singal > *Subject:* Re: JavaRDD using Reflection > > > > Hello Rachana, > > > > The easiest way would be to start with creating a 'parent' JavaRDD and run > different filters (based on different input arguments) to create respective > 'child' JavaRDDs dynamically. > > > > Notice that the creation of these children RDDs is handled by the > application driver. > > > > Hope this helps! > > Ajay > > > > On Mon, Sep 14, 2015 at 1:21 PM, Ankur Srivastava < > ankur.srivast...@gmail.com> wrote: > > Hi Rachana > > > > I didn't get you r question fully but as the error says you can not > perform a rdd transformation or action inside another transformation. In > your example you are performing an action "rdd2.values.count()" in side > the "map" transformation. It is not allowed and in any case this will be > very inefficient too. > > > > you should do something like this: > > > > final long rdd2_count = rdd2.values.count() > > rdd1.map(x => rdd2_count * x) > > > > Hope this helps!! > > > > - Ankur > > > > On Mon, Sep 14, 2015 at 9:54 AM, Rachana Srivastava < > rachana.srivast...@markmonitor.com> wrote: > > Hello all, > > > > I am working a problem that requires us to create different set of JavaRDD > based on different input arguments. We are getting following error when we > try to use a factory to create JavaRDD. Error message is clear but I am > wondering is there any workaround. > > > > *Question: * > > How to create different set of JavaRDD based on different input arguments > dynamically. Trying to implement something like factory pattern. > > > > *Error Message: * > > RDD transformations and actions can only be invoked by the driver, not > inside of other transformations; for example, rdd1.map(x => > rdd2.values.count() * x) is invalid because the values transformation and > count action cannot be performed inside of the rdd1.map transformation. For > more information, see SPARK-5063. > > > > Thanks, > > > > Rachana > > > > >