Hi Lydian,
2.41.0 is quite old, can you please try current version to see if this
issue is still present? There were lots of changes between 2.41.0 and
2.59.0.
Jan
On 9/17/24 17:49, Lydian Lee wrote:
Hi,
We are using Beam Python SDK with Flink Runner, the Beam version is
2.41.0 and the Flink version is 1.15.4.
We have a pipeline that has 2 stages:
1. read from kafka and fixed window for every 1 minute
2. aggregate the data for the past 1 minute and reshuffle so that we
have less partition count and write them into s3.
We disabled the enable.auto.commit and enabled
commit_offset_in_finalize. also the auto.offset.reset is set to "latest"
image.png
According to the log, I can definitely find the data is consuming from
Kafka Offset, Because there are many
```
Resetting offset for topic XXXX-<PARTITION> to offset <OFFSET>
```
and that partition/offset pair does match the missing records.
However, it doesn't show up in the final S3.
My current hypothesis is that the shuffling might be the reason for
the issue, for example, originally in kafka for the past minute in
partition 1, I have offset 1, 2, 3 records. After reshuffle, it now
distribute, for example:
- partition A: 1, 3
- partition B: 2
And if partition A is done successfully but partition B fails. Given
that A is succeeded, it will commit its offset to Kafka, and thus
kafka now has an offset to 3. And when kafka retries , it will skip
the offset 2. However, I am not sure how exactly the offset commit
works, wondering how it interacts with the checkpoints. But it does
seem like if my hypothesis is correct, we should be seeing more
missing records, however, this seems rare to happen. Wondering if
anyone can help identify potential root causes? Thanks