For prototyping purposes, I created a test program injecting dependancies using
Spring.
Nothing fancy. This is just a re-write of KafkaDirectWordCount. When I run
this, I get the following exception:
Exception in thread "main" org.apache.spark.SparkException: Task not
serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315)
at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1891)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:528)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:528)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109)
at org.apache.spark.SparkContext.withScope(SparkContext.scala:681)
at
org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:258)
at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:527)
at
org.apache.spark.streaming.api.java.JavaDStreamLike$class.map(JavaDStreamLike.scala:157)
at
org.apache.spark.streaming.api.java.AbstractJavaDStreamLike.map(JavaDStreamLike.scala:43)
at
com.olacabs.spark.examples.WordCountProcessorKafkaImpl.process(WordCountProcessorKafkaImpl.java:45)
at com.olacabs.spark.examples.WordCountApp.main(WordCountApp.java:49)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: Object of
org.apache.spark.streaming.kafka.DirectKafkaInputDStream is being serialized
possibly as a part of closure of an RDD operation. This is because the DStream
object is being referred to from within the closure. Please rewrite the RDD
operation inside this DStream to avoid this. This has been enforced to avoid
bloating of Spark tasks with unnecessary objects.
Serialization stack:
- object not serializable (class:
org.apache.spark.streaming.api.java.JavaStreamingContext, value:
org.apache.spark.streaming.api.java.JavaStreamingContext@7add323c)
- field (class: com.olacabs.spark.examples.WordCountProcessorKafkaImpl,
name: streamingContext, type: class
org.apache.spark.streaming.api.java.JavaStreamingContext)
- object (class com.olacabs.spark.examples.WordCountProcessorKafkaImpl,
com.olacabs.spark.examples.WordCountProcessorKafkaImpl@29a1505c)
- field (class: com.olacabs.spark.examples.WordCountProcessorKafkaImpl$1,
name: this$0, type: class
com.olacabs.spark.examples.WordCountProcessorKafkaImpl)
- object (class com.olacabs.spark.examples.WordCountProcessorKafkaImpl$1,
com.olacabs.spark.examples.WordCountProcessorKafkaImpl$1@c6c82aa)
- field (class:
org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name: fun$1,
type: interface org.apache.spark.api.java.function.Function)
- object (class
org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, <function1>)
at
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)
... 23 more
Can someone help me figure out why?
Here is the Code :
public interface EventProcessor extends Serializable {
void process();
}
public class WordCountProcessorKafkaImpl implements EventProcessor {
private static final Pattern SPACE = Pattern.compile(" ");
@Autowired
@Qualifier("streamingContext")
JavaStreamingContext streamingContext;
@Autowired
@Qualifier("inputDStream")
JavaPairInputDStream<String, String> inputDStream;
@Override
public void process() {
// Get the lines, split them into words, count the words and print
JavaDStream<String> lines = inputDStream.map(new
Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String,
String>() {
@Override
public Iterable<String> call(String x) {
return Lists.newArrayList(SPACE.split(x));
}
});
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
}).reduceByKey(
new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
wordCounts.print();
// Start the computation
streamingContext.start();
streamingContext.awaitTermination();
}
}