Have you looked at these: http://allegro.tech/2015/08/spark-kafka-integration.html http://mkuthan.github.io/blog/2016/01/29/spark-kafka-integration2/
Full example here: https://github.com/mkuthan/example-spark-kafka HTH. -Todd On Thu, Apr 21, 2016 at 2:08 PM, Alexander Gallego <agall...@concord.io> wrote: > Thanks Ted. > > KafkaWordCount (producer) does not operate on a DStream[T] > > ```scala > > > object KafkaWordCountProducer { > > def main(args: Array[String]) { > if (args.length < 4) { > System.err.println("Usage: KafkaWordCountProducer > <metadataBrokerList> <topic> " + > "<messagesPerSec> <wordsPerMessage>") > System.exit(1) > } > > val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args > > // Zookeeper connection properties > val props = new HashMap[String, Object]() > props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers) > props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, > "org.apache.kafka.common.serialization.StringSerializer") > props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, > "org.apache.kafka.common.serialization.StringSerializer") > > val producer = new KafkaProducer[String, String](props) > > // Send some messages > while(true) { > (1 to messagesPerSec.toInt).foreach { messageNum => > val str = (1 to wordsPerMessage.toInt).map(x => > scala.util.Random.nextInt(10).toString) > .mkString(" ") > > val message = new ProducerRecord[String, String](topic, null, str) > producer.send(message) > } > > Thread.sleep(1000) > } > } > > } > > ``` > > > Also, doing: > > > ``` > object KafkaSink { > def send(brokers: String, sc: SparkContext, topic: String, key: > String, value: String) = > getInstance(brokers, sc).value.send(new ProducerRecord(topic, > key, value)) > } > > KafkaSink.send(brokers, sparkContext)(outputTopic, record._1, record._2) > > ``` > > > Doesn't work either, the result is: > > Exception in thread "main" org.apache.spark.SparkException: Task not > serializable > > > Thanks! > > > > > On Thu, Apr 21, 2016 at 1:08 PM, Ted Yu <yuzhih...@gmail.com> wrote: > > > > In KafkaWordCount , the String is sent back and producer.send() is > called. > > > > I guess if you don't find via solution in your current design, you can > consider the above. > > > > On Thu, Apr 21, 2016 at 10:04 AM, Alexander Gallego <agall...@concord.io> > wrote: > >> > >> Hello, > >> > >> I understand that you cannot serialize Kafka Producer. > >> > >> So I've tried: > >> > >> (as suggested here > https://forums.databricks.com/questions/369/how-do-i-handle-a-task-not-serializable-exception.html) > > >> > >> - Make the class Serializable - not possible > >> > >> - Declare the instance only within the lambda function passed in map. > >> > >> via: > >> > >> // as suggested by the docs > >> > >> > >> ```scala > >> > >> kafkaOut.foreachRDD(rdd => { > >> rdd.foreachPartition(partition => { > >> val producer = new KafkaProducer(..) > >> partition.foreach { record => > >> producer.send(new ProducerRecord(outputTopic, record._1, > record._2) > >> } > >> producer.close() > >> }) > >> }) // foreachRDD > >> > >> > >> ``` > >> > >> - Make the NotSerializable object as a static and create it once per > machine. > >> > >> via: > >> > >> > >> ```scala > >> > >> > >> object KafkaSink { > >> @volatile private var instance: Broadcast[KafkaProducer[String, > String]] = null > >> def getInstance(brokers: String, sc: SparkContext): > Broadcast[KafkaProducer[String, String]] = { > >> if (instance == null) { > >> synchronized { > >> println("Creating new kafka producer") > >> val props = new java.util.Properties() > >> ....... > >> instance = sc.broadcast(new KafkaProducer[String, > String](props)) > >> sys.addShutdownHook { > >> instance.value.close() > >> } > >> } > >> } > >> instance > >> } > >> } > >> > >> > >> ``` > >> > >> > >> > >> - Call rdd.forEachPartition and create the NotSerializable object in > there like this: > >> > >> Same as above. > >> > >> > >> - Mark the instance @transient > >> > >> Same thing, just make it a class variable via: > >> > >> > >> ``` > >> @transient var producer: KakfaProducer[String,String] = null > >> def getInstance() = { > >> if( producer == null ) { > >> producer = new KafkaProducer() > >> } > >> producer > >> } > >> > >> ``` > >> > >> > >> However, I get serialization problems with all of these options. > >> > >> > >> Thanks for your help. > >> > >> - Alex > >> > > > > > > -- > > > > > > Alexander Gallego > Co-Founder & CTO >