bq. return new Tuple2<ImmutableBytesWritable, Put>(new ImmutableBytesWritable(), put);
I don't think Put is serializable. FYI On Fri, Jun 12, 2015 at 6:40 AM, Vamshi Krishna <vamshi2...@gmail.com> wrote: > Hi I am trying to write data that is produced from kafka commandline > producer for some topic. I am facing problem and unable to proceed. Below > is my code which I am creating a jar and running through spark-submit on > spark-shell. Am I doing wrong inside foreachRDD() ? What is wrong with > SparkKafkaDemo$2.call(SparkKafkaDemo.java:63) line in below error > message? > > > > SparkConf sparkConf = new > SparkConf().setAppName("JavaKafkaDemo").setMaster("local").setSparkHome("/Users/kvk/softwares/spark-1.3.1-bin-hadoop2.4"); > // Create the context with a 1 second batch size > JavaStreamingContext jsc = new > JavaStreamingContext(sparkConf, new Duration(5000)); > > int numThreads = 2; > Map<String, Integer> topicMap = new HashMap<String, > Integer>(); > // topicMap.put("viewTopic", numThreads); > topicMap.put("nonview", numThreads); > > JavaPairReceiverInputDStream<String, String> messages = > KafkaUtils.createStream(jsc, "localhost", > "ViewConsumer", topicMap); > > JavaDStream<String> lines = messages.map(new > Function<Tuple2<String, String>, String>() { > @Override > public String call(Tuple2<String, String> tuple2) { > return tuple2._2(); > } > }); > > lines.foreachRDD(new Function<JavaRDD<String>, Void>() { > @Override > public Void call(JavaRDD<String> > stringJavaRDD) throws Exception { > > JavaPairRDD<ImmutableBytesWritable, Put> hbasePuts = > stringJavaRDD.mapToPair( > new PairFunction<String, > ImmutableBytesWritable, Put>() { > @Override > public > Tuple2<ImmutableBytesWritable, Put> call(String line) throws Exception { > > Put put = new > Put(Bytes.toBytes("Rowkey" + Math.random())); > > put.addColumn(Bytes.toBytes("firstFamily"), Bytes.toBytes("firstColumn"), > Bytes.toBytes(line+"fc")); > return new > Tuple2<ImmutableBytesWritable, Put>(new ImmutableBytesWritable(), put); > } > }); > > // save to HBase- Spark built-in > API method > > > hbasePuts.saveAsNewAPIHadoopDataset(newAPIJobConfiguration1.getConfiguration()); > return null; > } > } > ); > jsc.start(); > jsc.awaitTermination(); > > > > > > I see below error on spark-shell. > > > ./bin/spark-submit --class "SparkKafkaDemo" --master local > /Users/kvk/IntelliJWorkspace/HbaseDemo/HbaseDemo.jar > > Exception in thread "main" org.apache.spark.SparkException: Task not > serializable > > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) > > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) > > at org.apache.spark.SparkContext.clean(SparkContext.scala:1623) > > at org.apache.spark.rdd.RDD.map(RDD.scala:286) > > at > org.apache.spark.api.java.JavaRDDLike$class.mapToPair(JavaRDDLike.scala:113) > > at > org.apache.spark.api.java.AbstractJavaRDDLike.mapToPair(JavaRDDLike.scala:46) > > at SparkKafkaDemo$2.call(SparkKafkaDemo.java:63) > > at SparkKafkaDemo$2.call(SparkKafkaDemo.java:60) > > at > org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:311) > > at > org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:311) > > at > org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534) > > at > org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534) > > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42) > > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) > > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) > > at scala.util.Try$.apply(Try.scala:161) > > at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) > > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:176) > > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176) > > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176) > > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) > > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:175) > > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > > at java.lang.Thread.run(Thread.java:745) > > Caused by: java.lang.IllegalStateException: Job in state DEFINE instead of > RUNNING > > at org.apache.hadoop.mapreduce.Job.ensureState(Job.java:283) > > at org.apache.hadoop.mapreduce.Job.toString(Job.java:452) > > at java.lang.String.valueOf(String.java:2847) > > at java.lang.StringBuilder.append(StringBuilder.java:128) > > at scala.StringContext.standardInterpolator(StringContext.scala:122) > > at scala.StringContext.s(StringContext.scala:90) > > at > org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:103) > > at > org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:158) > > at > org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:99) > > at > org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:158) > > at > org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:99) > > at > org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:158) > > at > org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:99) > > at > org.apache.spark.serializer.SerializationDebugger$.find(SerializationDebugger.scala:58) > > at > org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:39) > > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) > > at > org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80) > > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) > > ... 24 more > > > > Thanks > > > -kvk >