Hi Armaan, > org.apache.spark.SparkException: Job aborted due to stage failure: > Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.kafka.clients.consumer.ConsumerRecord
perhaps you should ask that question in the Spark mailing list, which should increase your chances of getting a good response for this Spark error. You should also share the Spark and Kafka versions you use. -Michael On Fri, Mar 10, 2017 at 7:34 PM, Armaan Esfahani < armaan.esfah...@advancedopen.com> wrote: > Hello, I have been trying to setup a SMACK stack to learn the basics of > Kafka Streams and Spark, yet I keep coming across the following error: > > org.apache.spark.SparkException: Job aborted due to stage failure: Task > 0.0 in stage 0.0 (TID 0) had a not serializable result: > org.apache.kafka.clients.consumer.ConsumerRecord > > > > I have a “Tweet” object which is a simple POJO with a Date and String that > then has a Serializer and Deserializer class. > > > > I have tested creating an object, serializing it to a local file, then > reading it with the deserializer and it works fine—however over the stream > it fails. > > > > To read the data from the kafka stream , I have setup a an input stream > using the following code: > > > > Map <String, Object> kafkaParams = new HashMap<>(); > > kafkaParams.put("bootstrap.servers", brokers); > > kafkaParams.put("key.deserializer", StringDeserializer.class); > > kafkaParams.put("value.deserializer", TweetDeserializer.class); > > kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream"); > > > > JavaInputDStream<ConsumerRecord<String, Tweet>> tweets = > KafkaUtils.createDirectStream( > > jssc, > > LocationStrategies.PreferConsistent(), > > ConsumerStrategies.<String, Tweet>Subscribe(topicsSet, > kafkaParams) > > ); > > > > To send a sample object to Kafka, I have the following for testing: > > > > Properties props = new Properties(); > > props.put("bootstrap.servers", "192.168.194.194:9092"); > > props.put("key.serializer", "org.apache.kafka.common.serialization. > StringSerializer"); > > props.put("value.serializer", "com.armaanaki.smack.tweet. > TweetSerializer"); > > > > final KafkaProducer<String, Tweet> kafkaProducer = new > KafkaProducer<String, Tweet>(props); > > > > ProducerRecord<String, Tweet> record = new ProducerRecord<String, > Tweet>("tweets1", "1", new Tweet(new Date(), "Test")); > > kafkaProducer.send(record); > > > > > > Can anyone explain my error? Thanks! > >