When building the request, you should build an UpdateRequest, like the following snippet:
import org.elasticsearch.action.update.UpdateRequest import org.elasticsearch.common.xcontent.XContentType val doc: String = ??? val targetIndex: String = ??? val indexType: Option[String] = ??? new UpdateRequest() .index(targetIndex) .`type`(indexType.getOrElse("_doc")) .id(id) .upsert(doc, XContentType.JSON) .doc(doc, XContentType.JSON) .docAsUpsert(true) I'm not entirely sure if you need both "doc" and "upsert" fields, as I think this depends on the Elasticsearch you're using. ________________________________ De: ApoorvK <apoorv.upadh...@razorpay.com> Enviat el: dilluns, 10 de febrer de 2020 15:33 Per a: user@flink.apache.org <user@flink.apache.org> Tema: Flink Elasticsearch upsert document in ES Team, Presently I have added elasticsearch as a sink to a stream and inserting the json data, the problem is when I restore the application in case of crash it reprocess the data in between (meanwhile a backend application updates the document in ES) and flink reinsert the document in ES and all update to ES are lost . I am trying for a update or insert in case document not found or do not insert if document is already there. I have tried by providing opType to elasticsearch builder, I am getting an error message "document already exists" on my console, but it still updates the value in elasticsearch val jsonString = write(record) val rqst: IndexRequest = Requests.indexRequest .index(parameter.get("esIndexName")) .`type`(parameter.get("esIndexType")) .id(record.getApi_key + "_" + record.getOrder_id) .source(jsonString, XContentType.JSON) .opType(OpType.CREATE) -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ ________________________________ Este mensaje y sus adjuntos se dirigen exclusivamente a su destinatario, puede contener informaci?n privilegiada o confidencial y es para uso exclusivo de la persona o entidad de destino. Si no es usted. el destinatario indicado, queda notificado de que la lectura, utilizaci?n, divulgaci?n y/o copia sin autorizaci?n puede estar prohibida en virtud de la legislaci?n vigente. Si ha recibido este mensaje por error, le rogamos que nos lo comunique inmediatamente por esta misma v?a y proceda a su destrucci?n. The information contained in this transmission is privileged and confidential information intended only for the use of the individual or entity named above. If the reader of this message is not the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. If you have received this transmission in error, do not read it. Please immediately reply to the sender that you have received this communication in error and then delete it. Esta mensagem e seus anexos se dirigem exclusivamente ao seu destinat?rio, pode conter informa??o privilegiada ou confidencial e ? para uso exclusivo da pessoa ou entidade de destino. Se n?o ? vossa senhoria o destinat?rio indicado, fica notificado de que a leitura, utiliza??o, divulga??o e/ou c?pia sem autoriza??o pode estar proibida em virtude da legisla??o vigente. Se recebeu esta mensagem por erro, rogamos-lhe que nos o comunique imediatamente por esta mesma via e proceda a sua destrui??o