I am getting this spark not serializable exception when running spark submit in 
standalone mode. I am trying to use spark streaming which gets its stream from 
kafka queues.. but it is not able to process the mapping actions on the RDDs 
from the stream ..the code where the serialization exception occurs as follows.
I have a separate class to manage the contexts... which has the respective 
getteres and setters:
public class Contexts {
public RedisContext rc=null;
public SparkContext sc=null;
public Gson serializer = new Gson();
public  SparkConf sparkConf = null;//new 
SparkConf().setAppName("SparkStreamEventProcessingEngine");
public  JavaStreamingContext jssc=null;//new JavaStreamingContext(sparkConf, 
new Duration(2000));
public  Producer<String, String> kafkaProducer=null;
public  Tuple2<String,Object> hostTup=null;



The class with the main process logic of spark streaming is as follows:
public final class SparkStreamEventProcessingEngine {
     public Contexts contexts= new Contexts();
     public SparkStreamEventProcessingEngine() {
       }

       public static void main(String[] args) {
             SparkStreamEventProcessingEngine temp=new 
SparkStreamEventProcessingEngine();
             temp.tempfunc();
       }
       private void tempfunc(){

           System.out.println(contexts.getJssc().toString() +"\n"+ 
contexts.getRc().toString()+"\n"+contexts.getSc().toString() +"\n");
         createRewardProducer();
         Properties props = new Properties();
         try {
             
props.load(SparkStreamEventProcessingEngine.class.getResourceAsStream("/application.properties"));
         } catch (IOException e) {
             System.out.println("Error loading application.properties file");
             return ;
         }

         Map<String, Integer> topicMap = new HashMap<String, Integer>();
         topicMap.put(props.getProperty("kafa.inbound.queue"),1);
         JavaPairReceiverInputDStream<String, String> messages =
                 KafkaUtils.createStream(contexts.getJssc(), 
props.getProperty("kafka.zookeeper.quorum"), 
props.getProperty("kafka.consumer.group"), topicMap);

      //The exception occurs at this line..
         JavaDStream<String> lines = messages.map(new Function<Tuple2<String, 
String>, String>() {
               //      private static final long serialVersionUID = 1L;

                     public String call(Tuple2<String, String> tuple2) {
                         return tuple2._2();
                     }
         });

         lines.foreachRDD(new Function<JavaRDD<String>,Void>() {
                public Void call(JavaRDD<String> rdd) throws Exception {
                 rdd.foreach(new VoidFunction<String>(){
                           public void call(String stringData) throws Exception 
{
                                Gson serializer = new Gson();
                         OfferRedeemed event = serializer.fromJson(stringData, 
OfferRedeemed.class);
                         System.out.println("Incoming Event:" + 
event.toString());
                         processTactic(event,"51367");
                         processTactic(event,"53740");
                     }
                 });
                 return null;
             }
         });

         contexts.getJssc().start();
         contexts.getJssc().awaitTermination();
       }

       private void processTactic(OfferRedeemed event, String tacticId){
             System.out.println(contexts.getRc().toString()+"hi4");

           TacticDefinition tactic = readTacticDefinition(tacticId);
           boolean conditionMet = false;
           if(tactic != null){
               System.out.println("Evaluating event of type :" + 
event.getEventType() + " for Tactic : " + tactic.toString());.... And so on.. 
for respective functionalities...

The exception thrown is as follows:


Exception in thread "main" org.apache.spark.SparkException: Task not 
serializable
        at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315)
        at 
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:1893)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:528)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:528)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
        at org.apache.spark.SparkContext.withScope(SparkContext.scala:681)
        at 
org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:258)
        at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:527)
        at 
org.apache.spark.streaming.api.java.JavaDStreamLike$class.map(JavaDStreamLike.scala:157)
        at 
org.apache.spark.streaming.api.java.AbstractJavaDStreamLike.map(JavaDStreamLike.scala:43)
        at 
com.coupons.stream.processing.SparkStreamEventProcessingEngine.tempfunc(SparkStreamEventProcessingEngine.java:366)
        at 
com.coupons.stream.processing.SparkStreamEventProcessingEngine.main(SparkStreamEventProcessingEngine.java:346)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
        at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: 
com.coupons.stream.processing.SparkStreamEventProcessingEngine
Serialization stack:
        - object not serializable (class: 
com.coupons.stream.processing.SparkStreamEventProcessingEngine, value: 
com.coupons.stream.processing.SparkStreamEventProcessingEngine@6a48a7f3)
        - field (class: 
com.coupons.stream.processing.SparkStreamEventProcessingEngine$1, name: this$0, 
type: class com.coupons.stream.processing.SparkStreamEventProcessingEngine)
        - object (class 
com.coupons.stream.processing.SparkStreamEventProcessingEngine$1, 
com.coupons.stream.processing.SparkStreamEventProcessingEngine$1@1c6c6f24)
        - field (class: 
org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name: fun$1, 
type: interface org.apache.spark.api.java.function.Function)
        - object (class 
org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, <function1>)
        at 
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
        at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
        at 
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
        at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)
        ... 23 more




Any type of help on the topic is appreciated...


Reply via email to