Yes, we do maintain checkpoints env.enableCheckpointing(300000); But we assumed it is for Kafka consumer offsets. Not sure how this is useful in this case? Can you pls. elaborate on this.
~Ramya. On Fri, Jun 21, 2019 at 4:33 PM miki haiat <miko5...@gmail.com> wrote: > Did you set some checkpoints configuration? > > On Fri, Jun 21, 2019, 13:17 Ramya Ramamurthy <hair...@gmail.com> wrote: > > > Hi, > > > > We use Kafka->Flink->Elasticsearch in our project. > > The data to the elasticsearch is not getting flushed, till the next batch > > arrives. > > E.g.: If the first batch contains 1000 packets, this gets pushed to the > > Elastic, only after the next batch arrives [irrespective of reaching the > > batch time limit]. > > Below are the sink configurations we use currently. > > > > esSinkBuilder.setBulkFlushMaxActions(2000); // 2K records > > esSinkBuilder.setBulkFlushMaxSizeMb(5); // 5 Mb once > > esSinkBuilder.setBulkFlushInterval(60000); // 1 minute once > > esSinkBuilder.setBulkFlushBackoffRetries(3); // Retry three times if bulk > > fails > > esSinkBuilder.setBulkFlushBackoffDelay(5000); // Retry after 5 seconds > > esSinkBuilder.setBulkFlushBackoff(true); > > > > Sink code : > > List<HttpHost> httpHosts = new ArrayList<>(); > > //httpHosts.add(new HttpHost("10.128.0.34", 9200, "http")); > > httpHosts.add(new HttpHost("192.168.80.171", 9200, "http")); > > > > ElasticsearchSink.Builder<Row> esSinkBuilder = new > > ElasticsearchSink.Builder<>( > > httpHosts, > > new ElasticsearchSinkFunction<Row>() { > > > > private IndexRequest createIndexRequest(byte[] document, String > indexDate) > > { > > > > return new IndexRequest(esIndex + indexDate, esType) > > .source(document, XContentType.JSON); > > > > } > > > > @Override > > public void process(Row r, RuntimeContext runtimeContext, RequestIndexer > > requestIndexer) { > > byte[] byteArray = serializationSchema.serialize(r); > > > > ObjectMapper mapper = new ObjectMapper(); > > ObjectWriter writer = mapper.writer(); > > > > try { > > JsonNode jsonNode = mapper.readTree(byteArray); > > > > long tumbleStart = jsonNode.get("fseen").asLong(); > > > > ZonedDateTime utc = > > Instant.ofEpochMilli(tumbleStart).atZone(ZoneOffset.UTC); > > String indexDate = DateTimeFormatter.ofPattern("yyyy.MM.dd").format(utc); > > > > byte[] document = writer.writeValueAsBytes(jsonNode); > > > > requestIndexer.add(createIndexRequest(document, indexDate)); > > } catch (Exception e) { > > System.out.println("In the error block"); > > } > > > > } > > } > > ); > > > > Has anyone faced this issue? Any help would be appreciated !! > > > > Thanks, > > >