new Properties()
> props.put("metadata.broker.list", "localhost:9092")
> props.put("serializer.class", "kafka.serializer.StringEncoder")
>
> val config= new ProducerConfig(props)
> val producer= new Producer[String,String](config)
", "kafka.serializer.StringEncoder")
val config= new ProducerConfig(props)
val producer= new Producer[String,String](config)
val
x=p.collect().mkString("\n").replace("[","").replace("]","").replace(",","~")
ring,String](config)
val x=p.collect().mkString("\n")
producer.send(new KeyedMessage[String, String]("trade", x))
}
}
Thanks
Sri
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/KafkaProducer-using-Cassandra-as-source-tp247