Hello community,

Any help here please?

Thanks,
Megh

On Mon, May 19, 2025, 18:48 megh vidani <vidanimeg...@gmail.com> wrote:

> I'm aware that Spark does not rely on the kafka committed offsets. It is
> purely for monitoring purposes.
>
> Thanks,
> Megh
>
> On Mon, May 19, 2025, 18:46 megh vidani <vidanimeg...@gmail.com> wrote:
>
>> Hi Prashant,
>>
>> I would like to do it so that I can monitor the consumer group along with
>> my other consumer groups.
>>
>> Thanks,
>> Megh
>>
>> On Mon, May 19, 2025, 18:21 Prashant Sharma <scrapco...@gmail.com> wrote:
>>
>>> Spark does not rely on Kafka's commit, in fact it tracks the stream
>>> progress itself and reads via offsets (e.g. from last read points). Why do
>>> you want to commit?
>>>
>>> On Mon, May 19, 2025 at 5:58 PM megh vidani <vidanimeg...@gmail.com>
>>> wrote:
>>>
>>>> Hello Spark Dev Community,
>>>>
>>>> Reaching out for the below problem statement.
>>>>
>>>> Thanks,
>>>> Megh
>>>>
>>>> On Mon, May 19, 2025, 13:16 megh vidani <vidanimeg...@gmail.com> wrote:
>>>>
>>>>> Hello Spark Community, I have a structured streaming job in which I'm
>>>>> consuming a topic with the same name in two different kafka clusters and
>>>>> then creating a union of these two streams. I've developed a custom
>>>>> query listener to commit the offsets back to the kafka clusters once every
>>>>> batch is completed in the onQueryProgress method. Now here's my
>>>>> problem: To commit these offsets back to Kafka, I need to iterate over the
>>>>> event.progress.sources list to fetch the individual source (2 in my case)
>>>>> and call the endOffsets method to fetch the latest offsets that have been
>>>>> consumed by the Query. As per the current structure of the source object,
>>>>> kafka bootstrap server info is not stored and it only stores the topic
>>>>> name. Since the topic name is same in both the streams, I can't determine
>>>>> which source belongs to cluster1 and which belongs to cluster2. This
>>>>> is basically leading to incorrect offsets getting committed to incorrect
>>>>> cluster. Has anyone encountered this issue before and have any
>>>>> solution to this? I tried to find a way to pass some custom metadata
>>>>> to the source object but there doesn't seem to be any. If anyone has
>>>>> any inputs please let me know. Thanks in advance. Megh
>>>>>
>>>>

Reply via email to