Hi, We use Kafka->Flink->Elasticsearch in our project. The data to the elasticsearch is not getting flushed, till the next batch arrives. E.g.: If the first batch contains 1000 packets, this gets pushed to the Elastic, only after the next batch arrives [irrespective of reaching the batch time limit]. Below are the sink configurations we use currently.
esSinkBuilder.setBulkFlushMaxActions(2000); // 2K records esSinkBuilder.setBulkFlushMaxSizeMb(5); // 5 Mb once esSinkBuilder.setBulkFlushInterval(60000); // 1 minute once esSinkBuilder.setBulkFlushBackoffRetries(3); // Retry three times if bulk fails esSinkBuilder.setBulkFlushBackoffDelay(5000); // Retry after 5 seconds esSinkBuilder.setBulkFlushBackoff(true); Sink code : List<HttpHost> httpHosts = new ArrayList<>(); //httpHosts.add(new HttpHost("10.128.0.34", 9200, "http")); httpHosts.add(new HttpHost("192.168.80.171", 9200, "http")); ElasticsearchSink.Builder<Row> esSinkBuilder = new ElasticsearchSink.Builder<>( httpHosts, new ElasticsearchSinkFunction<Row>() { private IndexRequest createIndexRequest(byte[] document, String indexDate) { return new IndexRequest(esIndex + indexDate, esType) .source(document, XContentType.JSON); } @Override public void process(Row r, RuntimeContext runtimeContext, RequestIndexer requestIndexer) { byte[] byteArray = serializationSchema.serialize(r); ObjectMapper mapper = new ObjectMapper(); ObjectWriter writer = mapper.writer(); try { JsonNode jsonNode = mapper.readTree(byteArray); long tumbleStart = jsonNode.get("fseen").asLong(); ZonedDateTime utc = Instant.ofEpochMilli(tumbleStart).atZone(ZoneOffset.UTC); String indexDate = DateTimeFormatter.ofPattern("yyyy.MM.dd").format(utc); byte[] document = writer.writeValueAsBytes(jsonNode); requestIndexer.add(createIndexRequest(document, indexDate)); } catch (Exception e) { System.out.println("In the error block"); } } } ); Has anyone faced this issue? Any help would be appreciated !! Thanks,