Hi, Thanks for the clarification.
> Other than managing offsets externally, Are there any other ways to guarantee > "at least once" processing without enabling checkpointing? That's currently not possible, at least with the default connector. Regards, Roman On Mon, Apr 12, 2021 at 3:14 PM Rahul Patwari <rahulpatwari8...@gmail.com> wrote: > > Hi Roman, > > Thanks for the reply. > This is what I meant by Internal restarts - Automatic restore of Flink Job > from a failure. For example, pipeline restarts when Fixed delay or Failure > Rate restart strategies are configured. > > Quoting documentation in this link - Configuring Kafka Consumer start > position configuration > >> Note that these start position configuration methods do not affect the start >> position when the job is automatically restored from a failure > > > > It seems that there will be data loss even when offsets are managed > externally when there are pipeline restarts due to a failure, say, an > exception. On the other hand, when the pipeline is stopped and > resubmitted(say, an upgrade), there won't be any data loss as offsets are > retrieved from an external store and configured while starting Kafka Consumer. > > We do not want to enable checkpointing as the pipeline is stateless. We have > Deduplication logic in the pipeline and the processing is idempotent. > > Other than managing offsets externally, Are there any other ways to guarantee > "at least once" processing without enabling checkpointing? > > Thanks, > Rahul > > On Mon, Apr 12, 2021 at 6:00 PM Roman Khachatryan <ro...@apache.org> wrote: >> >> Hi, >> >> Could you please explain what you mean by internal restarts? >> >> If you commit offsets or timestamps from sink after emitting records >> to the external system then there should be no data loss. >> Otherwise (if you commit offsets earlier), you have to persist >> in-flight records to avoid data loss (i.e. enable checkpointing). >> >> Regards, >> Roman >> >> On Mon, Apr 12, 2021 at 12:11 PM Rahul Patwari >> <rahulpatwari8...@gmail.com> wrote: >> > >> > Hello, >> > >> > Context: >> > >> > We have a stateless Flink Pipeline which reads from Kafka topics. >> > The pipeline has a Windowing operator(Used only for introducing a delay in >> > processing records) and AsyncI/O operators (used for Lookup/Enrichment). >> > >> > "At least Once" Processing semantics is needed for the pipeline to avoid >> > data loss. >> > >> > Checkpointing is disabled and we are dependent on the auto offset commit >> > of Kafka consumer for fault tolerance currently. >> > >> > As auto offset commit indicates that "the record is successfully read", >> > instead of "the record is successfully processed", there will be data loss >> > if there is a restart when the offset is committed to Kafka but not >> > successfully processed by the Flink Pipeline, as the record is NOT >> > replayed again when the pipeline is restarted. >> > >> > Checkpointing can solve this problem. But, since the pipeline is >> > stateless, we do not want to use checkpointing, which will persist all the >> > records in Windowing Operator and in-flight Async I/O calls. >> > >> > Question: >> > >> > We are looking for other ways to guarantee "at least once" processing >> > without checkpointing. One such way is to manage Kafka Offsets Externally. >> > >> > We can maintain offsets of each partition of each topic in Cassandra(or >> > maintain timestamp, where all records with timestamps less than this >> > timestamp are successfully processed) and configure Kafka consumer Start >> > Position - setStartFromTimestamp() or setStartFromSpecificOffsets() >> > >> > This will be helpful if the pipeline is manually restarted (say, >> > JobManager pod is restarted). But, how to avoid data loss in case of >> > internal restarts? >> > >> > Has anyone used this approach? >> > What are other ways to guarantee "at least once" processing without >> > checkpointing for a stateless Flink pipeline? >> > >> > Thanks, >> > Rahul