Hi Rahul, could you please help me to understand how the at least once guarantee would work without checkpointing in your case?
Let's say you read records A, B, C. You use a window to delay processing, so let's say A passes and B, C are still in the window for the trigger. Now do you want to auto commit the offset of A after it being written in the sink? If so, what's keeping you from attaching the offset of the Kafka records to A, B, C and write the offset when writing the record into the sink? (Probably need to wrap your sink function into a delegating sink function) The way, Flink does the checkpointing is that it checkpoints the offset of C, and the state of the window (containing B, C) to avoid data loss. Why is that not working for you? Which state size do you expect? Why do you need the window operator at all? Couldn't you just backpressure on the async I/O by delaying the processing there? Then there would be no need to change anything. On Tue, Apr 13, 2021 at 10:00 AM Roman Khachatryan <ro...@apache.org> wrote: > Hi Rahul, > > Right. There are no workarounds as far as I know. > > Regards, > Roman > > On Mon, Apr 12, 2021 at 9:00 PM Rahul Patwari > <rahulpatwari8...@gmail.com> wrote: > > > > Hi Roman, Arvid, > > > > So, to achieve "at least once" guarantee, currently, automatic restart > of Flink should be disabled? > > Is there any workaround to get "at least once" semantics with Flink > Automatic restarts in this case? > > > > Regards, > > Rahul > > > > On Mon, Apr 12, 2021 at 9:57 PM Roman Khachatryan <ro...@apache.org> > wrote: > >> > >> Hi, > >> > >> Thanks for the clarification. > >> > >> > Other than managing offsets externally, Are there any other ways to > guarantee "at least once" processing without enabling checkpointing? > >> > >> That's currently not possible, at least with the default connector. > >> > >> Regards, > >> Roman > >> > >> On Mon, Apr 12, 2021 at 3:14 PM Rahul Patwari > >> <rahulpatwari8...@gmail.com> wrote: > >> > > >> > Hi Roman, > >> > > >> > Thanks for the reply. > >> > This is what I meant by Internal restarts - Automatic restore of > Flink Job from a failure. For example, pipeline restarts when Fixed delay > or Failure Rate restart strategies are configured. > >> > > >> > Quoting documentation in this link - Configuring Kafka Consumer start > position configuration > >> > > >> >> Note that these start position configuration methods do not affect > the start position when the job is automatically restored from a failure > >> > > >> > > >> > > >> > It seems that there will be data loss even when offsets are managed > externally when there are pipeline restarts due to a failure, say, an > exception. On the other hand, when the pipeline is stopped and > resubmitted(say, an upgrade), there won't be any data loss as offsets are > retrieved from an external store and configured while starting Kafka > Consumer. > >> > > >> > We do not want to enable checkpointing as the pipeline is stateless. > We have Deduplication logic in the pipeline and the processing is > idempotent. > >> > > >> > Other than managing offsets externally, Are there any other ways to > guarantee "at least once" processing without enabling checkpointing? > >> > > >> > Thanks, > >> > Rahul > >> > > >> > On Mon, Apr 12, 2021 at 6:00 PM Roman Khachatryan <ro...@apache.org> > wrote: > >> >> > >> >> Hi, > >> >> > >> >> Could you please explain what you mean by internal restarts? > >> >> > >> >> If you commit offsets or timestamps from sink after emitting records > >> >> to the external system then there should be no data loss. > >> >> Otherwise (if you commit offsets earlier), you have to persist > >> >> in-flight records to avoid data loss (i.e. enable checkpointing). > >> >> > >> >> Regards, > >> >> Roman > >> >> > >> >> On Mon, Apr 12, 2021 at 12:11 PM Rahul Patwari > >> >> <rahulpatwari8...@gmail.com> wrote: > >> >> > > >> >> > Hello, > >> >> > > >> >> > Context: > >> >> > > >> >> > We have a stateless Flink Pipeline which reads from Kafka topics. > >> >> > The pipeline has a Windowing operator(Used only for introducing a > delay in processing records) and AsyncI/O operators (used for > Lookup/Enrichment). > >> >> > > >> >> > "At least Once" Processing semantics is needed for the pipeline to > avoid data loss. > >> >> > > >> >> > Checkpointing is disabled and we are dependent on the auto offset > commit of Kafka consumer for fault tolerance currently. > >> >> > > >> >> > As auto offset commit indicates that "the record is successfully > read", instead of "the record is successfully processed", there will be > data loss if there is a restart when the offset is committed to Kafka but > not successfully processed by the Flink Pipeline, as the record is NOT > replayed again when the pipeline is restarted. > >> >> > > >> >> > Checkpointing can solve this problem. But, since the pipeline is > stateless, we do not want to use checkpointing, which will persist all the > records in Windowing Operator and in-flight Async I/O calls. > >> >> > > >> >> > Question: > >> >> > > >> >> > We are looking for other ways to guarantee "at least once" > processing without checkpointing. One such way is to manage Kafka Offsets > Externally. > >> >> > > >> >> > We can maintain offsets of each partition of each topic in > Cassandra(or maintain timestamp, where all records with timestamps less > than this timestamp are successfully processed) and configure Kafka > consumer Start Position - setStartFromTimestamp() or > setStartFromSpecificOffsets() > >> >> > > >> >> > This will be helpful if the pipeline is manually restarted (say, > JobManager pod is restarted). But, how to avoid data loss in case of > internal restarts? > >> >> > > >> >> > Has anyone used this approach? > >> >> > What are other ways to guarantee "at least once" processing > without checkpointing for a stateless Flink pipeline? > >> >> > > >> >> > Thanks, > >> >> > Rahul >