Hi,

                I have a requirement to create a spark streaming job that get 
data from kafka broker and need to apply window function on the data coming 
into the spark context.
This is how I connected to kafka from spark

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "<my-srvername>",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "7b37787c-20e7-4614-98ba-6f4212e07bf0",
  "auto.offset.reset" -> "earliest",
  "enable.auto.commit" -> (true: java.lang.Boolean)
)

val topics = Array("7b37787c-20e7-4614-98ba-6f4212e07bf0")
val inputMsg = KafkaUtils.createDirectStream[String,String](
  ssc,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)

The variable "inputMsg" is of type "InputDStream[ConsumerRecord[String,String]]"
When I used window function "inputMsg.window(Minute(1))", it throws an error 
like below

ERROR DirectKafkaInputDStream: Kafka ConsumerRecord is not serializable. Use 
.map to extract fields before calling .persist or .window

Can some one help me on how to use widow function on spark streaming using 
kafka?

Regards,
Favas

Reply via email to