Hi Rahul, Right. There are no workarounds as far as I know.
Regards, Roman On Mon, Apr 12, 2021 at 9:00 PM Rahul Patwari <rahulpatwari8...@gmail.com> wrote: > > Hi Roman, Arvid, > > So, to achieve "at least once" guarantee, currently, automatic restart of > Flink should be disabled? > Is there any workaround to get "at least once" semantics with Flink Automatic > restarts in this case? > > Regards, > Rahul > > On Mon, Apr 12, 2021 at 9:57 PM Roman Khachatryan <ro...@apache.org> wrote: >> >> Hi, >> >> Thanks for the clarification. >> >> > Other than managing offsets externally, Are there any other ways to >> > guarantee "at least once" processing without enabling checkpointing? >> >> That's currently not possible, at least with the default connector. >> >> Regards, >> Roman >> >> On Mon, Apr 12, 2021 at 3:14 PM Rahul Patwari >> <rahulpatwari8...@gmail.com> wrote: >> > >> > Hi Roman, >> > >> > Thanks for the reply. >> > This is what I meant by Internal restarts - Automatic restore of Flink Job >> > from a failure. For example, pipeline restarts when Fixed delay or Failure >> > Rate restart strategies are configured. >> > >> > Quoting documentation in this link - Configuring Kafka Consumer start >> > position configuration >> > >> >> Note that these start position configuration methods do not affect the >> >> start position when the job is automatically restored from a failure >> > >> > >> > >> > It seems that there will be data loss even when offsets are managed >> > externally when there are pipeline restarts due to a failure, say, an >> > exception. On the other hand, when the pipeline is stopped and >> > resubmitted(say, an upgrade), there won't be any data loss as offsets are >> > retrieved from an external store and configured while starting Kafka >> > Consumer. >> > >> > We do not want to enable checkpointing as the pipeline is stateless. We >> > have Deduplication logic in the pipeline and the processing is idempotent. >> > >> > Other than managing offsets externally, Are there any other ways to >> > guarantee "at least once" processing without enabling checkpointing? >> > >> > Thanks, >> > Rahul >> > >> > On Mon, Apr 12, 2021 at 6:00 PM Roman Khachatryan <ro...@apache.org> wrote: >> >> >> >> Hi, >> >> >> >> Could you please explain what you mean by internal restarts? >> >> >> >> If you commit offsets or timestamps from sink after emitting records >> >> to the external system then there should be no data loss. >> >> Otherwise (if you commit offsets earlier), you have to persist >> >> in-flight records to avoid data loss (i.e. enable checkpointing). >> >> >> >> Regards, >> >> Roman >> >> >> >> On Mon, Apr 12, 2021 at 12:11 PM Rahul Patwari >> >> <rahulpatwari8...@gmail.com> wrote: >> >> > >> >> > Hello, >> >> > >> >> > Context: >> >> > >> >> > We have a stateless Flink Pipeline which reads from Kafka topics. >> >> > The pipeline has a Windowing operator(Used only for introducing a delay >> >> > in processing records) and AsyncI/O operators (used for >> >> > Lookup/Enrichment). >> >> > >> >> > "At least Once" Processing semantics is needed for the pipeline to >> >> > avoid data loss. >> >> > >> >> > Checkpointing is disabled and we are dependent on the auto offset >> >> > commit of Kafka consumer for fault tolerance currently. >> >> > >> >> > As auto offset commit indicates that "the record is successfully read", >> >> > instead of "the record is successfully processed", there will be data >> >> > loss if there is a restart when the offset is committed to Kafka but >> >> > not successfully processed by the Flink Pipeline, as the record is NOT >> >> > replayed again when the pipeline is restarted. >> >> > >> >> > Checkpointing can solve this problem. But, since the pipeline is >> >> > stateless, we do not want to use checkpointing, which will persist all >> >> > the records in Windowing Operator and in-flight Async I/O calls. >> >> > >> >> > Question: >> >> > >> >> > We are looking for other ways to guarantee "at least once" processing >> >> > without checkpointing. One such way is to manage Kafka Offsets >> >> > Externally. >> >> > >> >> > We can maintain offsets of each partition of each topic in Cassandra(or >> >> > maintain timestamp, where all records with timestamps less than this >> >> > timestamp are successfully processed) and configure Kafka consumer >> >> > Start Position - setStartFromTimestamp() or >> >> > setStartFromSpecificOffsets() >> >> > >> >> > This will be helpful if the pipeline is manually restarted (say, >> >> > JobManager pod is restarted). But, how to avoid data loss in case of >> >> > internal restarts? >> >> > >> >> > Has anyone used this approach? >> >> > What are other ways to guarantee "at least once" processing without >> >> > checkpointing for a stateless Flink pipeline? >> >> > >> >> > Thanks, >> >> > Rahul