Hello, I recently started programming with Apache Flink API. I am trying to get input directly from kafka in a JSON format with the following code:
/private void kafkaConsumer(String server, String topic) { Properties properties = new Properties(); properties.setProperty("bootstrap.servers", server); properties.setProperty("group.id", "Demo"); stream = environment.addSource(new FlinkKafkaConsumer09<>(topic, new JSONDeserializationSchema(), properties)) .map(new MapFunction<ObjectNode, Event>() { @Override public Event map(ObjectNode value) throws Exception { return new Event(Integer.parseInt(value.get("id").asText()), value.get("user").asText(), value.get("action").asText(), value.get("ip").asText()); } }); }/ But I alwys get the following error: /17:56:46,335 ERROR org.apache.flink.runtime.taskmanager.Task - Task execution failed. com.fasterxml.jackson.databind.JsonMappingException: No content to map due to end-of-input at [Source: [B@69a90966; line: 1, column: 1] at com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:148) at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3095) at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3036) at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2215) at org.apache.flink.streaming.util.serialization.JSONDeserializationSchema.deserialize(JSONDeserializationSchema.java:38) at org.apache.flink.streaming.util.serialization.JSONDeserializationSchema.deserialize(JSONDeserializationSchema.java:30) at org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:39) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:227) at java.lang.Thread.run(Thread.java:745)/ What am I doing wrong? Attached follows the JSON sample that I am using. Thank you and Regards. log.json <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n9536/log.json> -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/JsonMappingException-No-content-to-map-due-to-end-of-input-tp9536.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.