Hi there, I have the following use case-
I have data coming from Kafka which I need to stream and write each message to a database. I’m using kafka-flink connector for streaming data from Kafka. I don’t want to use flink sinks to write date from stream. I’m doing the following which doesn’t seem to work- messageStream .rebalance() .map(new MapFunction<String, Object>() { @Override public String map(String value) { getDbSession().execute("insert into TABLE_XXX (key, event_timeuuid, data) " + "VALUES ("+ i+",null, value); "); return value; } }) How can I iterate over each message in the stream and do something with that message? Thanks Information contained in this e-mail message is confidential. This e-mail message is intended only for the personal use of the recipient(s) named above. If you are not an intended recipient, do not read, distribute or reproduce this transmission (including any attachments). If you have received this email in error, please immediately notify the sender by email reply and delete the original message.