When building the request, you should build an UpdateRequest, like the
following snippet:
import org.elasticsearch.action.update.UpdateRequest
import org.elasticsearch.common.xcontent.XContentType
val doc: String = ???
val targetIndex: String = ???
val indexType: Option[String] = ???
new UpdateRequest()
.index(targetIndex)
.`type`(indexType.getOrElse("_doc"))
.id(id)
.upsert(doc, XContentType.JSON)
.doc(doc, XContentType.JSON)
.docAsUpsert(true)
I'm not entirely sure if you need both "doc" and "upsert" fields, as I think
this depends on the Elasticsearch you're using.
________________________________
De: ApoorvK <[email protected]>
Enviat el: dilluns, 10 de febrer de 2020 15:33
Per a: [email protected] <[email protected]>
Tema: Flink Elasticsearch upsert document in ES
Team,
Presently I have added elasticsearch as a sink to a stream and inserting the
json data, the problem is when I restore the application in case of crash it
reprocess the data in between (meanwhile a backend application updates the
document in ES) and flink reinsert the document in ES and all update to ES
are lost .
I am trying for a update or insert in case document not found or do not
insert if document is already there.
I have tried by providing opType to elasticsearch builder, I am getting an
error message "document already exists" on my console, but it still updates
the value in elasticsearch
val jsonString = write(record)
val rqst: IndexRequest = Requests.indexRequest
.index(parameter.get("esIndexName"))
.`type`(parameter.get("esIndexType"))
.id(record.getApi_key + "_" + record.getOrder_id)
.source(jsonString, XContentType.JSON)
.opType(OpType.CREATE)
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
________________________________
Este mensaje y sus adjuntos se dirigen exclusivamente a su destinatario, puede
contener informaci?n privilegiada o confidencial y es para uso exclusivo de la
persona o entidad de destino. Si no es usted. el destinatario indicado, queda
notificado de que la lectura, utilizaci?n, divulgaci?n y/o copia sin
autorizaci?n puede estar prohibida en virtud de la legislaci?n vigente. Si ha
recibido este mensaje por error, le rogamos que nos lo comunique inmediatamente
por esta misma v?a y proceda a su destrucci?n.
The information contained in this transmission is privileged and confidential
information intended only for the use of the individual or entity named above.
If the reader of this message is not the intended recipient, you are hereby
notified that any dissemination, distribution or copying of this communication
is strictly prohibited. If you have received this transmission in error, do not
read it. Please immediately reply to the sender that you have received this
communication in error and then delete it.
Esta mensagem e seus anexos se dirigem exclusivamente ao seu destinat?rio, pode
conter informa??o privilegiada ou confidencial e ? para uso exclusivo da pessoa
ou entidade de destino. Se n?o ? vossa senhoria o destinat?rio indicado, fica
notificado de que a leitura, utiliza??o, divulga??o e/ou c?pia sem autoriza??o
pode estar proibida em virtude da legisla??o vigente. Se recebeu esta mensagem
por erro, rogamos-lhe que nos o comunique imediatamente por esta mesma via e
proceda a sua destrui??o