Hi, I’ll use your code to explain.
public IndexRequest createIndexRequest(String element){ HashMap<String, Object> esJson = new HashMap<>(); esJson.put("data", element); What you should do here is parse the field values from `element`, and simply treat them as key-value pairs of the `esJson` map. So, the `esJson` should be prepared by doing: esJson.put(“id”, 6); esJson.put(“name”, “A green door”); esJson.put(“price”, 12.5); etc. Cheers, Gordon On February 21, 2017 at 12:41:40 AM, Fábio Dias (fabiodio...@gmail.com) wrote: Hi, I'm using Flink and Elasticsearch and I want to recieve in elasticsearch a json object ({"id":1, "name":"X"} ect...), I already have a string with this information, but I don't want to save it as string. I recieve this: { "_index": "logs", "_type": "object", "_id": "AVpcARfkfYWqSubr0ZvK", "_score": 1, "_source": { "data": "{\"id\":6,\"name\":\"A green door\",\"price\":12.5,\"tags\":[\"home\",\"green\"]}" } } And I want to recieve this: { "_index": "logs", "_type": "external", "_id": "AVpcARfkfYWqSubr0ZvK", "_score": 1, "_source": { "data": { "id":6, "name":"A green door", "price":12.5, "tags": ["home","green"] } } } my java code: try { ArrayList<InetSocketAddress> transports = new ArrayList<>(); transports.add(new InetSocketAddress("127.0.0.1", 9300)); ElasticsearchSinkFunction<String> indexLog = new ElasticsearchSinkFunction<String>() { private static final long serialVersionUID = 8802869701292023100L; public IndexRequest createIndexRequest(String element){ HashMap<String, Object> esJson = new HashMap<>(); esJson.put("data", element); return Requests .indexRequest() .index("logs") .type("object") .source(esJson); } @Override public void process(String element, RuntimeContext ctx, RequestIndexer indexer) { indexer.add(createIndexRequest(element)); } }; ElasticsearchSink<String> esSink = new ElasticsearchSink<String>(config, transports, indexLog); input.addSink(esSink); } catch (Exception e) { System.out.println(e); } Do I need to treat every entry as a map? Can I just send a object with key value? Thanks.