Team,
Presently I have added elasticsearch as a sink to a stream and inserting the
json data, the problem is when I restore the application in case of crash it
reprocess the data in between (meanwhile a backend application updates the
document in ES) and flink reinsert the document in ES and all update to ES
are lost .
I am trying for a update or insert in case document not found or do not
insert if document is already there.
I have tried by providing opType to elasticsearch builder, I am getting an
error message "document already exists" on my console, but it still updates
the value in elasticsearch
val jsonString = write(record)
val rqst: IndexRequest = Requests.indexRequest
.index(parameter.get("esIndexName"))
.`type`(parameter.get("esIndexType"))
.id(record.getApi_key + "_" + record.getOrder_id)
.source(jsonString, XContentType.JSON)
.opType(OpType.CREATE)
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/