Hi, a MapFunction should be the way to go for this use case. What exactly is not working? Do you get an exception? Is the map method not called?
Best, Fabian 2016-11-03 0:00 GMT+01:00 Sandeep Vakacharla <svakacha...@fanatics.com>: > Hi there, > > > > I have the following use case- > > > > I have data coming from Kafka which I need to stream and write each > message to a database. I’m using kafka-flink connector for streaming data > from Kafka. I don’t want to use flink sinks to write date from stream. > > > > I’m doing the following which doesn’t seem to work- > > > > messageStream > .rebalance() > .map(*new *MapFunction<String, Object>() { > @Override > *public *String map(String value) { > getDbSession().execute(*"insert into TABLE_XXX (key, > event_timeuuid, data) " *+ > *"VALUES ("*+ i+*",null, value); "*); > *return *value; > } > }) > > > > How can I iterate over each message in the stream and do something with > that message? > > > > Thanks > > > Information contained in this e-mail message is confidential. This e-mail > message is intended only for the personal use of the recipient(s) named > above. If you are not an intended recipient, do not read, distribute or > reproduce this transmission (including any attachments). If you have > received this email in error, please immediately notify the sender by email > reply and delete the original message. >