Hi Jan,

Thanks for the recommendation. In our case, we are windowing with the
processing time, which means that there should be no late event at all.

You’ve mentioned that GroupByKey is stateful and can potentially drop the
data. Given that after reshuffle (add random shuffle id to the key), we
then do the aggregation (combine the data and write those data to S3.) Do
you think the example I mentioned earlier could potentially be the reason
for the dropping data?

If so, in general how does Beam being able to prevent that ? Are there any
suggested approaches? Thanks

On Wed, Sep 18, 2024 at 12:33 AM Jan Lukavský <[email protected]> wrote:

> Hi Lydian,
>
> in that case, there is only a generic advice you can look into. Reshuffle
> is a stateless operation that should not cause dropping data. A GroupByKey
> on the other hand is stateful and thus can - when dealing with late data -
> drop some of them. You should be able to confirm this looking for
> 'droppedDueToLateness' counter and/or log in here [1]. This happens when
> elements arrive after watermark passes element's timestamp minus allowed
> lateness. If you see the log, you might need to either change how you
> assign timestamps to elements (e.g. use log append time) or increase
> allowed lateness of your windowfn.
>
> Best,
>
>  Jan
>
> [1]
> https://github.com/apache/beam/blob/f37795e326a75310828518464189440b14863834/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java#L132
> On 9/18/24 08:53, Lydian Lee wrote:
>
> I would love to, but there are some limitations on our ends that the
> version bump won’t be happened soon. Thus I need to figure out what might
> be the root cause though.
>
>
> On Tue, Sep 17, 2024 at 11:26 PM Jan Lukavský <[email protected]> wrote:
>
>> Hi Lydian,
>>
>> 2.41.0 is quite old, can you please try current version to see if this
>> issue is still present? There were lots of changes between 2.41.0 and
>> 2.59.0.
>>
>>  Jan
>> On 9/17/24 17:49, Lydian Lee wrote:
>>
>> Hi,
>>
>> We are using Beam Python SDK with Flink Runner, the Beam version is
>> 2.41.0 and the Flink version is 1.15.4.
>>
>> We have a pipeline that has 2 stages:
>> 1. read from kafka and fixed window for every 1 minute
>> 2. aggregate the data for the past 1 minute and reshuffle so that we have
>> less partition count and write them into s3.
>>
>> We disabled the enable.auto.commit and enabled commit_offset_in_finalize.
>> also the auto.offset.reset is set to "latest"
>> [image: image.png]
>>
>> According to the log, I can definitely find the data is consuming from
>> Kafka Offset, Because there are many
>> ```
>> Resetting offset for topic XXXX-<PARTITION>  to offset <OFFSET>
>> ```
>> and that partition/offset pair does match the missing records.  However,
>> it doesn't show up in the final S3.
>>
>> My current hypothesis is that the shuffling might be the reason for the
>> issue, for example, originally in kafka for the past minute in partition
>> 1,  I have offset 1, 2, 3 records. After reshuffle, it now distribute, for
>> example:
>> - partition A: 1, 3
>> - partition B: 2
>>
>> And if partition A is done successfully but partition B fails. Given that
>> A is succeeded, it will commit its offset to Kafka, and thus kafka now has
>> an offset to 3.  And when kafka retries , it will skip the offset 2.
>>  However, I am not sure how exactly the offset commit works, wondering how
>> it interacts with the checkpoints.  But it does seem like if my hypothesis
>> is correct, we should be seeing more missing records, however, this seems
>> rare to happen.  Wondering if anyone can help identify potential
>> root causes?  Thanks
>>
>>
>>
>>
>>

Reply via email to