Hi Rahul,

Right. There are no workarounds as far as I know.

Regards,
Roman

On Mon, Apr 12, 2021 at 9:00 PM Rahul Patwari
<rahulpatwari8...@gmail.com> wrote:
>
> Hi Roman, Arvid,
>
> So, to achieve "at least once" guarantee, currently, automatic restart of 
> Flink should be disabled?
> Is there any workaround to get "at least once" semantics with Flink Automatic 
> restarts in this case?
>
> Regards,
> Rahul
>
> On Mon, Apr 12, 2021 at 9:57 PM Roman Khachatryan <ro...@apache.org> wrote:
>>
>> Hi,
>>
>> Thanks for the clarification.
>>
>> > Other than managing offsets externally, Are there any other ways to 
>> > guarantee "at least once" processing without enabling checkpointing?
>>
>> That's currently not possible, at least with the default connector.
>>
>> Regards,
>> Roman
>>
>> On Mon, Apr 12, 2021 at 3:14 PM Rahul Patwari
>> <rahulpatwari8...@gmail.com> wrote:
>> >
>> > Hi Roman,
>> >
>> > Thanks for the reply.
>> > This is what I meant by Internal restarts - Automatic restore of Flink Job 
>> > from a failure. For example, pipeline restarts when Fixed delay or Failure 
>> > Rate restart strategies are configured.
>> >
>> > Quoting documentation in this link - Configuring Kafka Consumer start 
>> > position configuration
>> >
>> >> Note that these start position configuration methods do not affect the 
>> >> start position when the job is automatically restored from a failure
>> >
>> >
>> >
>> > It seems that there will be data loss even when offsets are managed 
>> > externally when there are pipeline restarts due to a failure, say, an 
>> > exception. On the other hand, when the pipeline is stopped and 
>> > resubmitted(say, an upgrade), there won't be any data loss as offsets are 
>> > retrieved from an external store and configured while starting Kafka 
>> > Consumer.
>> >
>> > We do not want to enable checkpointing as the pipeline is stateless. We 
>> > have Deduplication logic in the pipeline and the processing is idempotent.
>> >
>> > Other than managing offsets externally, Are there any other ways to 
>> > guarantee "at least once" processing without enabling checkpointing?
>> >
>> > Thanks,
>> > Rahul
>> >
>> > On Mon, Apr 12, 2021 at 6:00 PM Roman Khachatryan <ro...@apache.org> wrote:
>> >>
>> >> Hi,
>> >>
>> >> Could you please explain what you mean by internal restarts?
>> >>
>> >> If you commit offsets or timestamps from sink after emitting records
>> >> to the external system then there should be no data loss.
>> >> Otherwise (if you commit offsets earlier), you have to persist
>> >> in-flight records to avoid data loss (i.e. enable checkpointing).
>> >>
>> >> Regards,
>> >> Roman
>> >>
>> >> On Mon, Apr 12, 2021 at 12:11 PM Rahul Patwari
>> >> <rahulpatwari8...@gmail.com> wrote:
>> >> >
>> >> > Hello,
>> >> >
>> >> > Context:
>> >> >
>> >> > We have a stateless Flink Pipeline which reads from Kafka topics.
>> >> > The pipeline has a Windowing operator(Used only for introducing a delay 
>> >> > in processing records) and AsyncI/O operators (used for 
>> >> > Lookup/Enrichment).
>> >> >
>> >> > "At least Once" Processing semantics is needed for the pipeline to 
>> >> > avoid data loss.
>> >> >
>> >> > Checkpointing is disabled and we are dependent on the auto offset 
>> >> > commit of Kafka consumer for fault tolerance currently.
>> >> >
>> >> > As auto offset commit indicates that "the record is successfully read", 
>> >> > instead of "the record is successfully processed", there will be data 
>> >> > loss if there is a restart when the offset is committed to Kafka but 
>> >> > not successfully processed by the Flink Pipeline, as the record is NOT 
>> >> > replayed again when the pipeline is restarted.
>> >> >
>> >> > Checkpointing can solve this problem. But, since the pipeline is 
>> >> > stateless, we do not want to use checkpointing, which will persist all 
>> >> > the records in Windowing Operator and in-flight Async I/O calls.
>> >> >
>> >> > Question:
>> >> >
>> >> > We are looking for other ways to guarantee "at least once" processing 
>> >> > without checkpointing. One such way is to manage Kafka Offsets 
>> >> > Externally.
>> >> >
>> >> > We can maintain offsets of each partition of each topic in Cassandra(or 
>> >> > maintain timestamp, where all records with timestamps less than this 
>> >> > timestamp are successfully processed) and configure Kafka consumer 
>> >> > Start Position - setStartFromTimestamp() or 
>> >> > setStartFromSpecificOffsets()
>> >> >
>> >> > This will be helpful if the pipeline is manually restarted (say, 
>> >> > JobManager pod is restarted). But, how to avoid data loss in case of 
>> >> > internal restarts?
>> >> >
>> >> > Has anyone used this approach?
>> >> > What are other ways to guarantee "at least once" processing without 
>> >> > checkpointing for a stateless Flink pipeline?
>> >> >
>> >> > Thanks,
>> >> > Rahul

Reply via email to