Hi,
I am using spark streaming for streaming data from kafka 0.8
I am using checkpointing in HDFS . I am getting error like below
java.io.NotSerializableException: DStream checkpointing has been enabled
but the DStreams with their functions are not serialisable
field (class:
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1,
name: foreachFunc$1, type: interface
org.apache.spark.api.java.function.Function)
- object (class
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1,
<function1>)
- field (class:
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3,
name: cleanedF$1, type: interface scala.Function1)
- object (class
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3,
<function2>)
- writeObject data (class: org.apache.spark.streaming.dstream.DStream)
- object (class org.apache.spark.streaming.dstream.ForEachDStream,
org.apache.spark.streaming.dstream.ForEachDStream@333c4112)
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 16)
- field (class: scala.collection.mutable.ArrayBuffer, name: array, type:
class [Ljava.lang.Object;)
- object (class scala.collection.mutable.ArrayBuffer,
ArrayBuffer(org.apache.spark.streaming.dstream.ForEachDStream@333c4112))
- writeObject data (class:
org.apache.spark.streaming.dstream.DStreamCheckpointData)
- object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [
0 checkpoint files
])
- writeObject data (class: org.apache.spark.streaming.dstream.DStream)
- object (class org.apache.spark.streaming.kafka.KafkaInputDStream,
org.apache.spark.streaming.kafka.KafkaInputDStream@5f989b88)
- writeObject data (class:
org.apache.spark.streaming.dstream.DStreamCheckpointData)
- object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [
0 checkpoint files
])
- writeObject data (class: org.apache.spark.streaming.dstream.DStream)
- object (class org.apache.spark.streaming.kafka.KafkaInputDStream,
org.apache.spark.streaming.kafka.KafkaInputDStream@36f6bc85)
- writeObject data (class:
org.apache.spark.streaming.dstream.DStreamCheckpointData)
- object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [
0 checkpoint files
In my foreachRDD for dstream I use code like below
foreachRDD(
new Function<JavaPairRDD<String, String>, Void>() {
private static final long serialVersionUID = 1L;
@Override
public Void call(JavaPairRDD<String, String> v1)
throws Exception {
Map<String, String> localMap = v1.collectAsMap();
myfunction(localmap)
};
my function writes content of local map to file.
Thanks
Sandesh