>From how I understand it:

https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/elasticsearch.html#elasticsearch-sinks-and-fault-tolerance

the Flink Elasticsearch Sink guarantees at-least-once delivery of action
> requests to Elasticsearch clusters. It does so by waiting for all pending
> action requests in the BulkProcessor at the time of checkpoints. This
> effectively assures that all requests before the checkpoint was triggered
> have been successfully acknowledged by Elasticsearch, before proceeding to
> process more records sent to the sink.


So I am thinking:


   - If I put a .map(json -> json.set("_id", ElasticsearchId.generate()) in
   front of the Elasticsearch sink
   - If I have a ActionRequestFailureHandler that drops any ID conflicts on
   the floor

Would this give me exactly once output to Elasticsearch as the
BulkProcessor's checkpoint would include the "_id" and thus in the event of
a recovery the duplicates would be detected.

Or is there some additional concern I need to be aware of?

Thanks

-stephenc

Reply via email to