Hey Alexander, Thanks for the feedback and apologies for my late reply.
This validates my understanding of AT_LEAST_ONCE wrt the kafka producer. I tried to reproduce the issue, but came back empty handed. As you pointed out the culprit could be a call to an external, non-idempotent, api. I'll follow up might we stumble upon this issue again. On Thu, Oct 26, 2023 at 9:55 PM Alexander Fedulov <alexander.fedu...@gmail.com> wrote: > > * to clarify: by different output I mean that for the same input message the > output message could be slightly smaller due to the abovementioned factors > and fall into the allowed size range without causing any failures > > On Thu, 26 Oct 2023 at 21:52, Alexander Fedulov <alexander.fedu...@gmail.com> > wrote: >> >> Your expectations are correct. In case of AT_LEAST_ONCE Flink will wait for >> all outstanding records in the Kafka buffers to be acknowledged before >> marking the checkpoint successful (=also recording the offsets of the >> sources). That said, there might be other factors involved that could lead >> to a different output even when reading the same data from the sources - >> such as using using processing time (instead of event time) or doing some >> sort of lookup calls to external systems. If you absolutely cannot think of >> a scenario where this could be the case for your application, please try to >> reproduce the error reliably - this is something that needs to be further >> looked into. >> >> Best, >> Alexander Fedulov >> >> On Mon, 23 Oct 2023 at 19:11, Gabriele Modena <gmod...@wikimedia.org> wrote: >>> >>> Hey folks, >>> >>> We currently run (py) flink 1.17 on k8s (managed by flink k8s >>> operator), with HA and checkpointing (fixed retries policy). We >>> produce into Kafka with AT_LEAST_ONCE delivery guarantee. >>> >>> Our application failed when trying to produce a message larger than >>> Kafka's message larger than message.max.bytes. This offset was never >>> going to be committed, so Flink HA was not able to recover the >>> application. >>> >>> Upon a manual restart, it looks like the offending offset has been >>> lost: it was not picked after rewinding to the checkpointed offset, >>> and it was not committed to Kafka. I would have expected this offset >>> to not have made it past the KafkaProducer commit checkpoint barrier, >>> and that the app would fail again on it. >>> >>> I understand that there are failure scenarios that could end in data >>> loss when Kafka delivery guarantee is set to EXACTLY_ONCE and kafka >>> expires an uncommitted transaction. >>> >>> However, it's unclear to me if other corner cases would apply to >>> AT_LEAST_ONCE guarantees. Upon broker failure and app restarts, I >>> would expect duplicate messages but no data loss. What I can see as a >>> problem is that this commit was never going to happen. >>> >>> Is this expected behaviour? Am I missing something here? >>> >>> Cheers, >>> -- >>> Gabriele Modena (he / him) >>> Staff Software Engineer >>> Wikimedia Foundation -- Gabriele Modena (he / him) Staff Software Engineer Wikimedia Foundation