I have tried by providing opType to elasticsearch builder, I am getting an
error message "document already exists" on my console, but it still updates
the value in elasticsearch

val jsonString = write(record)
val rqst: IndexRequest = Requests.indexRequest
  .index(parameter.get("esIndexName"))
  .`type`(parameter.get("esIndexType"))
  .id(record.getApi_key + "_" + record.getOrder_id)
  .source(jsonString, XContentType.JSON)
    .opType(OpType.CREATE)


On Mon, Feb 10, 2020 at 1:42 PM Itamar Syn-Hershko <
ita...@bigdataboutique.com> wrote:

> Hi ApoorvK,
>
> Elasticsearch supports "create" mode while indexing. By default indexing
> will overwrite documents with a the same ID, but you can tell ES to refuse
> overwriting. See op_type in
> https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html#docs-index-api-query-params
> .
>
> Looking at the Elasticsearch Sink, it doesn't seem like it's implemented
> currently, but it should be relatively easy to add.
>
> On Mon, Feb 10, 2020 at 9:26 AM ApoorvK <apoorv.upadh...@razorpay.com>
> wrote:
>
>> Team,
>> Presently I have added elasticsearch as a sink to a stream and inserting
>> the
>> json data, the problem is when I restore the application in case of crash
>> it
>> reprocess the data in between (meanwhile a backend application updates the
>> document in ES) and flink reinsert the document in ES and all update to ES
>> are lost .
>>
>> I am trying for a update or insert in case document not found or do not
>> insert if document is already there.
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>
>
> --
>
> [image: logo] <https://bigdataboutique.com/>
> Itamar Syn-Hershko
> CTO, Founder
>
> ita...@bigdataboutique.com
> https://bigdataboutique.com
> <https://www.linkedin.com/in/itamar-syn-hershko-78b25013>
> <https://twitter.com/synhershko>
> <https://www.youtube.com/channel/UCBHr7lM2u6SCWPJvcKug-Yg>
>

Reply via email to