Hi,
We use Kafka->Flink->Elasticsearch in our project.
The data to the elasticsearch is not getting flushed, till the next batch
arrives.
E.g.: If the first batch contains 1000 packets, this gets pushed to the
Elastic, only after the next batch arrives [irrespective of reaching the
batch time limit].
Below are the sink configurations we use currently.
esSinkBuilder.setBulkFlushMaxActions(2000); // 2K records
esSinkBuilder.setBulkFlushMaxSizeMb(5); // 5 Mb once
esSinkBuilder.setBulkFlushInterval(60000); // 1 minute once
esSinkBuilder.setBulkFlushBackoffRetries(3); // Retry three times if bulk
fails
esSinkBuilder.setBulkFlushBackoffDelay(5000); // Retry after 5 seconds
esSinkBuilder.setBulkFlushBackoff(true);
Sink code :
List<HttpHost> httpHosts = new ArrayList<>();
//httpHosts.add(new HttpHost("10.128.0.34", 9200, "http"));
httpHosts.add(new HttpHost("192.168.80.171", 9200, "http"));
ElasticsearchSink.Builder<Row> esSinkBuilder = new
ElasticsearchSink.Builder<>(
httpHosts,
new ElasticsearchSinkFunction<Row>() {
private IndexRequest createIndexRequest(byte[] document, String indexDate) {
return new IndexRequest(esIndex + indexDate, esType)
.source(document, XContentType.JSON);
}
@Override
public void process(Row r, RuntimeContext runtimeContext, RequestIndexer
requestIndexer) {
byte[] byteArray = serializationSchema.serialize(r);
ObjectMapper mapper = new ObjectMapper();
ObjectWriter writer = mapper.writer();
try {
JsonNode jsonNode = mapper.readTree(byteArray);
long tumbleStart = jsonNode.get("fseen").asLong();
ZonedDateTime utc =
Instant.ofEpochMilli(tumbleStart).atZone(ZoneOffset.UTC);
String indexDate = DateTimeFormatter.ofPattern("yyyy.MM.dd").format(utc);
byte[] document = writer.writeValueAsBytes(jsonNode);
requestIndexer.add(createIndexRequest(document, indexDate));
} catch (Exception e) {
System.out.println("In the error block");
}
}
}
);
Has anyone faced this issue? Any help would be appreciated !!
Thanks,