Hi, We are using Beam Python SDK with Flink Runner, the Beam version is 2.41.0 and the Flink version is 1.15.4.
We have a pipeline that has 2 stages: 1. read from kafka and fixed window for every 1 minute 2. aggregate the data for the past 1 minute and reshuffle so that we have less partition count and write them into s3. We disabled the enable.auto.commit and enabled commit_offset_in_finalize. also the auto.offset.reset is set to "latest" [image: image.png] According to the log, I can definitely find the data is consuming from Kafka Offset, Because there are many ``` Resetting offset for topic XXXX-<PARTITION> to offset <OFFSET> ``` and that partition/offset pair does match the missing records. However, it doesn't show up in the final S3. My current hypothesis is that the shuffling might be the reason for the issue, for example, originally in kafka for the past minute in partition 1, I have offset 1, 2, 3 records. After reshuffle, it now distribute, for example: - partition A: 1, 3 - partition B: 2 And if partition A is done successfully but partition B fails. Given that A is succeeded, it will commit its offset to Kafka, and thus kafka now has an offset to 3. And when kafka retries , it will skip the offset 2. However, I am not sure how exactly the offset commit works, wondering how it interacts with the checkpoints. But it does seem like if my hypothesis is correct, we should be seeing more missing records, however, this seems rare to happen. Wondering if anyone can help identify potential root causes? Thanks
