Hi I am trying to write data that is produced from kafka commandline
producer for some topic. I am facing problem and unable to proceed. Below
is my code which I am creating a jar and running through spark-submit on
spark-shell. Am I doing wrong inside foreachRDD() ? What is wrong with
SparkKafkaDemo$2.call(SparkKafkaDemo.java:63) line in below error
message?
SparkConf sparkConf = new
SparkConf().setAppName("JavaKafkaDemo").setMaster("local").setSparkHome("/Users/kvk/softwares/spark-1.3.1-bin-hadoop2.4");
// Create the context with a 1 second batch size
JavaStreamingContext jsc = new
JavaStreamingContext(sparkConf, new Duration(5000));
int numThreads = 2;
Map<String, Integer> topicMap = new HashMap<String,
Integer>();
// topicMap.put("viewTopic", numThreads);
topicMap.put("nonview", numThreads);
JavaPairReceiverInputDStream<String, String> messages =
KafkaUtils.createStream(jsc, "localhost",
"ViewConsumer", topicMap);
JavaDStream<String> lines = messages.map(new
Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
lines.foreachRDD(new Function<JavaRDD<String>, Void>() {
@Override
public Void call(JavaRDD<String>
stringJavaRDD) throws Exception {
JavaPairRDD<ImmutableBytesWritable, Put> hbasePuts =
stringJavaRDD.mapToPair(
new PairFunction<String,
ImmutableBytesWritable, Put>() {
@Override
public
Tuple2<ImmutableBytesWritable, Put> call(String line) throws Exception {
Put put = new
Put(Bytes.toBytes("Rowkey" + Math.random()));
put.addColumn(Bytes.toBytes("firstFamily"), Bytes.toBytes("firstColumn"),
Bytes.toBytes(line+"fc"));
return new
Tuple2<ImmutableBytesWritable, Put>(new ImmutableBytesWritable(), put);
}
});
// save to HBase- Spark built-in
API method
hbasePuts.saveAsNewAPIHadoopDataset(newAPIJobConfiguration1.getConfiguration());
return null;
}
}
);
jsc.start();
jsc.awaitTermination();
I see below error on spark-shell.
./bin/spark-submit --class "SparkKafkaDemo" --master local
/Users/kvk/IntelliJWorkspace/HbaseDemo/HbaseDemo.jar
Exception in thread "main" org.apache.spark.SparkException: Task not
serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1623)
at org.apache.spark.rdd.RDD.map(RDD.scala:286)
at
org.apache.spark.api.java.JavaRDDLike$class.mapToPair(JavaRDDLike.scala:113)
at
org.apache.spark.api.java.AbstractJavaRDDLike.mapToPair(JavaRDDLike.scala:46)
at SparkKafkaDemo$2.call(SparkKafkaDemo.java:63)
at SparkKafkaDemo$2.call(SparkKafkaDemo.java:60)
at
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:311)
at
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:311)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:176)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:175)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalStateException: Job in state DEFINE instead of
RUNNING
at org.apache.hadoop.mapreduce.Job.ensureState(Job.java:283)
at org.apache.hadoop.mapreduce.Job.toString(Job.java:452)
at java.lang.String.valueOf(String.java:2847)
at java.lang.StringBuilder.append(StringBuilder.java:128)
at scala.StringContext.standardInterpolator(StringContext.scala:122)
at scala.StringContext.s(StringContext.scala:90)
at
org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:103)
at
org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:158)
at
org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:99)
at
org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:158)
at
org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:99)
at
org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:158)
at
org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:99)
at
org.apache.spark.serializer.SerializationDebugger$.find(SerializationDebugger.scala:58)
at
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:39)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
... 24 more
Thanks
-kvk