Yes, we do maintain checkpoints
env.enableCheckpointing(300000);

But we assumed it is for Kafka consumer offsets. Not sure how this is
useful in this case? Can you pls. elaborate on this.

~Ramya.



On Fri, Jun 21, 2019 at 4:33 PM miki haiat <miko5...@gmail.com> wrote:

> Did you set some checkpoints  configuration?
>
> On Fri, Jun 21, 2019, 13:17 Ramya Ramamurthy <hair...@gmail.com> wrote:
>
> > Hi,
> >
> > We use Kafka->Flink->Elasticsearch in our project.
> > The data to the elasticsearch is not getting flushed, till the next batch
> > arrives.
> > E.g.: If the first batch contains 1000 packets, this gets pushed to the
> > Elastic, only after the next batch arrives [irrespective of reaching the
> > batch time limit].
> > Below are the sink configurations we use  currently.
> >
> > esSinkBuilder.setBulkFlushMaxActions(2000); // 2K records
> > esSinkBuilder.setBulkFlushMaxSizeMb(5); // 5 Mb once
> > esSinkBuilder.setBulkFlushInterval(60000); // 1 minute once
> > esSinkBuilder.setBulkFlushBackoffRetries(3); // Retry three times if bulk
> > fails
> > esSinkBuilder.setBulkFlushBackoffDelay(5000); // Retry after 5 seconds
> > esSinkBuilder.setBulkFlushBackoff(true);
> >
> > Sink code :
> > List<HttpHost> httpHosts = new ArrayList<>();
> > //httpHosts.add(new HttpHost("10.128.0.34", 9200, "http"));
> > httpHosts.add(new HttpHost("192.168.80.171", 9200, "http"));
> >
> > ElasticsearchSink.Builder<Row> esSinkBuilder = new
> > ElasticsearchSink.Builder<>(
> > httpHosts,
> > new ElasticsearchSinkFunction<Row>() {
> >
> > private IndexRequest createIndexRequest(byte[] document, String
> indexDate)
> > {
> >
> > return new IndexRequest(esIndex + indexDate, esType)
> > .source(document, XContentType.JSON);
> >
> > }
> >
> > @Override
> > public void process(Row r, RuntimeContext runtimeContext, RequestIndexer
> > requestIndexer) {
> > byte[] byteArray = serializationSchema.serialize(r);
> >
> > ObjectMapper mapper = new ObjectMapper();
> > ObjectWriter writer = mapper.writer();
> >
> > try {
> > JsonNode jsonNode = mapper.readTree(byteArray);
> >
> > long tumbleStart = jsonNode.get("fseen").asLong();
> >
> > ZonedDateTime utc =
> > Instant.ofEpochMilli(tumbleStart).atZone(ZoneOffset.UTC);
> > String indexDate = DateTimeFormatter.ofPattern("yyyy.MM.dd").format(utc);
> >
> > byte[] document = writer.writeValueAsBytes(jsonNode);
> >
> > requestIndexer.add(createIndexRequest(document, indexDate));
> > } catch (Exception e) {
> > System.out.println("In the error block");
> > }
> >
> > }
> > }
> > );
> >
> > Has anyone faced this issue? Any help would be appreciated !!
> >
> > Thanks,
> >
>

Reply via email to