Hi,

Thanks for the clarification.

> Other than managing offsets externally, Are there any other ways to guarantee 
> "at least once" processing without enabling checkpointing?

That's currently not possible, at least with the default connector.

Regards,
Roman

On Mon, Apr 12, 2021 at 3:14 PM Rahul Patwari
<rahulpatwari8...@gmail.com> wrote:
>
> Hi Roman,
>
> Thanks for the reply.
> This is what I meant by Internal restarts - Automatic restore of Flink Job 
> from a failure. For example, pipeline restarts when Fixed delay or Failure 
> Rate restart strategies are configured.
>
> Quoting documentation in this link - Configuring Kafka Consumer start 
> position configuration
>
>> Note that these start position configuration methods do not affect the start 
>> position when the job is automatically restored from a failure
>
>
>
> It seems that there will be data loss even when offsets are managed 
> externally when there are pipeline restarts due to a failure, say, an 
> exception. On the other hand, when the pipeline is stopped and 
> resubmitted(say, an upgrade), there won't be any data loss as offsets are 
> retrieved from an external store and configured while starting Kafka Consumer.
>
> We do not want to enable checkpointing as the pipeline is stateless. We have 
> Deduplication logic in the pipeline and the processing is idempotent.
>
> Other than managing offsets externally, Are there any other ways to guarantee 
> "at least once" processing without enabling checkpointing?
>
> Thanks,
> Rahul
>
> On Mon, Apr 12, 2021 at 6:00 PM Roman Khachatryan <ro...@apache.org> wrote:
>>
>> Hi,
>>
>> Could you please explain what you mean by internal restarts?
>>
>> If you commit offsets or timestamps from sink after emitting records
>> to the external system then there should be no data loss.
>> Otherwise (if you commit offsets earlier), you have to persist
>> in-flight records to avoid data loss (i.e. enable checkpointing).
>>
>> Regards,
>> Roman
>>
>> On Mon, Apr 12, 2021 at 12:11 PM Rahul Patwari
>> <rahulpatwari8...@gmail.com> wrote:
>> >
>> > Hello,
>> >
>> > Context:
>> >
>> > We have a stateless Flink Pipeline which reads from Kafka topics.
>> > The pipeline has a Windowing operator(Used only for introducing a delay in 
>> > processing records) and AsyncI/O operators (used for Lookup/Enrichment).
>> >
>> > "At least Once" Processing semantics is needed for the pipeline to avoid 
>> > data loss.
>> >
>> > Checkpointing is disabled and we are dependent on the auto offset commit 
>> > of Kafka consumer for fault tolerance currently.
>> >
>> > As auto offset commit indicates that "the record is successfully read", 
>> > instead of "the record is successfully processed", there will be data loss 
>> > if there is a restart when the offset is committed to Kafka but not 
>> > successfully processed by the Flink Pipeline, as the record is NOT 
>> > replayed again when the pipeline is restarted.
>> >
>> > Checkpointing can solve this problem. But, since the pipeline is 
>> > stateless, we do not want to use checkpointing, which will persist all the 
>> > records in Windowing Operator and in-flight Async I/O calls.
>> >
>> > Question:
>> >
>> > We are looking for other ways to guarantee "at least once" processing 
>> > without checkpointing. One such way is to manage Kafka Offsets Externally.
>> >
>> > We can maintain offsets of each partition of each topic in Cassandra(or 
>> > maintain timestamp, where all records with timestamps less than this 
>> > timestamp are successfully processed) and configure Kafka consumer Start 
>> > Position - setStartFromTimestamp() or setStartFromSpecificOffsets()
>> >
>> > This will be helpful if the pipeline is manually restarted (say, 
>> > JobManager pod is restarted). But, how to avoid data loss in case of 
>> > internal restarts?
>> >
>> > Has anyone used this approach?
>> > What are other ways to guarantee "at least once" processing without 
>> > checkpointing for a stateless Flink pipeline?
>> >
>> > Thanks,
>> > Rahul

Reply via email to