I have tried by providing opType to elasticsearch builder, I am getting an error message "document already exists" on my console, but it still updates the value in elasticsearch
val jsonString = write(record) val rqst: IndexRequest = Requests.indexRequest .index(parameter.get("esIndexName")) .`type`(parameter.get("esIndexType")) .id(record.getApi_key + "_" + record.getOrder_id) .source(jsonString, XContentType.JSON) .opType(OpType.CREATE) On Mon, Feb 10, 2020 at 1:42 PM Itamar Syn-Hershko < ita...@bigdataboutique.com> wrote: > Hi ApoorvK, > > Elasticsearch supports "create" mode while indexing. By default indexing > will overwrite documents with a the same ID, but you can tell ES to refuse > overwriting. See op_type in > https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html#docs-index-api-query-params > . > > Looking at the Elasticsearch Sink, it doesn't seem like it's implemented > currently, but it should be relatively easy to add. > > On Mon, Feb 10, 2020 at 9:26 AM ApoorvK <apoorv.upadh...@razorpay.com> > wrote: > >> Team, >> Presently I have added elasticsearch as a sink to a stream and inserting >> the >> json data, the problem is when I restore the application in case of crash >> it >> reprocess the data in between (meanwhile a backend application updates the >> document in ES) and flink reinsert the document in ES and all update to ES >> are lost . >> >> I am trying for a update or insert in case document not found or do not >> insert if document is already there. >> >> >> >> -- >> Sent from: >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >> > > > -- > > [image: logo] <https://bigdataboutique.com/> > Itamar Syn-Hershko > CTO, Founder > > ita...@bigdataboutique.com > https://bigdataboutique.com > <https://www.linkedin.com/in/itamar-syn-hershko-78b25013> > <https://twitter.com/synhershko> > <https://www.youtube.com/channel/UCBHr7lM2u6SCWPJvcKug-Yg> >