Hello, I have been trying to setup a SMACK stack to learn the basics of Kafka 
Streams and Spark, yet I keep coming across the following error: 

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in 
stage 0.0 (TID 0) had a not serializable result: 
org.apache.kafka.clients.consumer.ConsumerRecord

 

I have a “Tweet” object which is a simple POJO with a Date and String that then 
has a Serializer and Deserializer class. 

 

I have tested creating an object, serializing it to a local file, then reading 
it with the deserializer and it works fine—however over the stream it fails.

 

To read the data from the kafka stream , I have setup a an input stream using 
the following code:

 

Map <String, Object> kafkaParams = new HashMap<>();

kafkaParams.put("bootstrap.servers", brokers);

kafkaParams.put("key.deserializer", StringDeserializer.class);

kafkaParams.put("value.deserializer", TweetDeserializer.class);

kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");

                                 

JavaInputDStream<ConsumerRecord<String, Tweet>> tweets = 
KafkaUtils.createDirectStream(

                jssc, 

                LocationStrategies.PreferConsistent(),

                ConsumerStrategies.<String, Tweet>Subscribe(topicsSet, 
kafkaParams)

                );

 

To send a sample object to Kafka, I have the following for testing:

                                

Properties props = new Properties();

props.put("bootstrap.servers", "192.168.194.194:9092");

props.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");

props.put("value.serializer", "com.armaanaki.smack.tweet.TweetSerializer");

                                

final KafkaProducer<String, Tweet> kafkaProducer = new KafkaProducer<String, 
Tweet>(props);

 

ProducerRecord<String, Tweet> record = new ProducerRecord<String, 
Tweet>("tweets1", "1", new Tweet(new Date(), "Test"));

kafkaProducer.send(record);

 

 

Can anyone explain my error? Thanks!

Reply via email to