Hi Armaan,

> org.apache.spark.SparkException: Job aborted due to stage failure:
>    Task 0.0 in stage 0.0 (TID 0) had a not serializable result:
org.apache.kafka.clients.consumer.ConsumerRecord

perhaps you should ask that question in the Spark mailing list, which
should increase your chances of getting a good response for this Spark
error.  You should also share the Spark and Kafka versions you use.

-Michael



On Fri, Mar 10, 2017 at 7:34 PM, Armaan Esfahani <
armaan.esfah...@advancedopen.com> wrote:

> Hello, I have been trying to setup a SMACK stack to learn the basics of
> Kafka Streams and Spark, yet I keep coming across the following error:
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task
> 0.0 in stage 0.0 (TID 0) had a not serializable result:
> org.apache.kafka.clients.consumer.ConsumerRecord
>
>
>
> I have a “Tweet” object which is a simple POJO with a Date and String that
> then has a Serializer and Deserializer class.
>
>
>
> I have tested creating an object, serializing it to a local file, then
> reading it with the deserializer and it works fine—however over the stream
> it fails.
>
>
>
> To read the data from the kafka stream , I have setup a an input stream
> using the following code:
>
>
>
> Map <String, Object> kafkaParams = new HashMap<>();
>
> kafkaParams.put("bootstrap.servers", brokers);
>
> kafkaParams.put("key.deserializer", StringDeserializer.class);
>
> kafkaParams.put("value.deserializer", TweetDeserializer.class);
>
> kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
>
>
>
> JavaInputDStream<ConsumerRecord<String, Tweet>> tweets =
> KafkaUtils.createDirectStream(
>
>                 jssc,
>
>                 LocationStrategies.PreferConsistent(),
>
>                 ConsumerStrategies.<String, Tweet>Subscribe(topicsSet,
> kafkaParams)
>
>                 );
>
>
>
> To send a sample object to Kafka, I have the following for testing:
>
>
>
> Properties props = new Properties();
>
> props.put("bootstrap.servers", "192.168.194.194:9092");
>
> props.put("key.serializer", "org.apache.kafka.common.serialization.
> StringSerializer");
>
> props.put("value.serializer", "com.armaanaki.smack.tweet.
> TweetSerializer");
>
>
>
> final KafkaProducer<String, Tweet> kafkaProducer = new
> KafkaProducer<String, Tweet>(props);
>
>
>
> ProducerRecord<String, Tweet> record = new ProducerRecord<String,
> Tweet>("tweets1", "1", new Tweet(new Date(), "Test"));
>
> kafkaProducer.send(record);
>
>
>
>
>
> Can anyone explain my error? Thanks!
>
>

Reply via email to