Hi Devs,
I just started using Flink and would like to ass kafka as Sink. I went
through the documentation but so far I've not succeeded in writing to Kafka
from Flink....
I' building application in Scala.... Here is my code snippet
case class *Demo*(city: String, country: String, zipcode: Int)
The map stage returns an instance of Demo type
val env = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties()
properties.setProperty("bootstrap.servers", "127.0.0.1:9092")
properties.setProperty("zookeeper.connect", "127.0.0.1:2181")
properties.setProperty("group.id", "test_topic")
val mapToDemo: String => Demo = {//Implementation}
val stream = env.addSource(new FlinkKafkaConsumer082[String]("test", new
SimpleStringSchema, properties))
stream.map(mapToDemo).addSink(new FlinkKafkaProducer[Demo]("127.0.0.1:9092",
"test_topic", new SimpleStringSchema()))
Can anyone explain me what am I doing wrong in adding Kafka as Sink ?
--
Thanks,
Deepak Jha