Hi,

We use Kafka->Flink->Elasticsearch in our project.
The data to the elasticsearch is not getting flushed, till the next batch
arrives.
E.g.: If the first batch contains 1000 packets, this gets pushed to the
Elastic, only after the next batch arrives [irrespective of reaching the
batch time limit].
Below are the sink configurations we use  currently.

esSinkBuilder.setBulkFlushMaxActions(2000); // 2K records
esSinkBuilder.setBulkFlushMaxSizeMb(5); // 5 Mb once
esSinkBuilder.setBulkFlushInterval(60000); // 1 minute once
esSinkBuilder.setBulkFlushBackoffRetries(3); // Retry three times if bulk
fails
esSinkBuilder.setBulkFlushBackoffDelay(5000); // Retry after 5 seconds
esSinkBuilder.setBulkFlushBackoff(true);

Sink code :
List<HttpHost> httpHosts = new ArrayList<>();
//httpHosts.add(new HttpHost("10.128.0.34", 9200, "http"));
httpHosts.add(new HttpHost("192.168.80.171", 9200, "http"));

ElasticsearchSink.Builder<Row> esSinkBuilder = new
ElasticsearchSink.Builder<>(
httpHosts,
new ElasticsearchSinkFunction<Row>() {

private IndexRequest createIndexRequest(byte[] document, String indexDate) {

return new IndexRequest(esIndex + indexDate, esType)
.source(document, XContentType.JSON);

}

@Override
public void process(Row r, RuntimeContext runtimeContext, RequestIndexer
requestIndexer) {
byte[] byteArray = serializationSchema.serialize(r);

ObjectMapper mapper = new ObjectMapper();
ObjectWriter writer = mapper.writer();

try {
JsonNode jsonNode = mapper.readTree(byteArray);

long tumbleStart = jsonNode.get("fseen").asLong();

ZonedDateTime utc =
Instant.ofEpochMilli(tumbleStart).atZone(ZoneOffset.UTC);
String indexDate = DateTimeFormatter.ofPattern("yyyy.MM.dd").format(utc);

byte[] document = writer.writeValueAsBytes(jsonNode);

requestIndexer.add(createIndexRequest(document, indexDate));
} catch (Exception e) {
System.out.println("In the error block");
}

}
}
);

Has anyone faced this issue? Any help would be appreciated !!

Thanks,

Reply via email to