Hi,

Just pushed the PoC making it working based on spring kafka listener
container. The main point is about using ConsumerRebalanceListener to watch
for changed in the partition the consumer member is bound to, and take
advantage of spring container to make it pausing. Relevant code here
https://github.com/asorian0/kafka-admin-poc/blob/main/consumer/src/main/java/dev/asoriano/kafkaadminpoc/KafkaConsumerRebalanceListener.java

Note that this is the simpler case, but guess it is enough to demonstrate a
way to go to achieve granular control of members. Anycase guess it is
better to make kafka code self-capable to achieve this and my first thought
was about jumping into the ConsumerCoordinator and make it able to
pause/resume the listener itself, so we wouldn't need to rely on any
wrapper like the one spring kafka is providing here.

Give a check on the log at
https://github.com/asorian0/kafka-admin-poc/blob/main/drop-member.log,
where we can find the following:
2025-02-12T13:24:56.073+01:00  INFO 26151 --- [kafka-admin-poc-consumer]
[ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer
clientId=consumer-test-group-1, groupId=test-group] Request joining group
due to: encountered UNKNOWN_MEMBER_ID from OFFSET_COMMIT response

Would make sense to think of some specific value for paused members, so
instead of UNKNOWN_MEMBER_ID we would be getting something like
DROPPED_MEMBER_ID_BY_ADMIN so we could handle the case in
ConsumerCoordinator, so the listener can be put on pause status? (in this
case we'd need something like ATTACHED_MEMBER_ID_BY_ADMIN to handle the
opposite scenario)

Also, would it make more sense to just think of freezing partitions
themselves to be consumed by any listeners instead of focusing on the
consumer group members?

Do you think we got enough to move forward on this as a KIP?

Thanks,

El mar, 11 feb 2025 a las 16:00, Armando Soriano (<asoriano....@gmail.com>)
escribió:

> Hi there,
>
> The PoC is here https://github.com/asorian0/kafka-admin-poc. I just found
> that I was using kafka 3 instead of 4 and it seems that changes in
> MemberToRemove are not working with the latest major. Are you aware about
> what could have been changed?
>
> The actual error is:
>  org.apache.kafka.common.errors.UnknownMemberIdException: The coordinator
> is not aware of this member.
>
> The point is that we're actually sending the same data than when it is
> working on version 3.5.2.
>
> Back to you soon with the interceptor updates.
>
> Thanks,
>
>
> El lun, 10 feb 2025 a las 10:05, Armando Soriano (<asoriano....@gmail.com>)
> escribió:
>
>> Hi Tsai,
>>
>> Thanks for your quick response and sorry for the late reply, i'm involved
>> in some other investigations that are consuming my time, but i'm going to
>> move forward on this one from now on.
>>
>> Your questions are really pointing the spot though, and guess i kinda
>> anticipated them in the last piece of the first email:
>>
>> >> I figured out that to make sure that members are not connecting again
>> to the consumer group, we'd require the listener (we're using spring-kafka)
>> to stop trying. Guess it could be done by pausing it from the service code
>> itself, by watching the reason the disconnection happened (this thing we
>> pass on removeMembersFromConsumerGroup). But everything lies on being able
>> to drop the members by setting the proper value as memberId from the admin
>> client.
>>
>> So, i'm going to work in a PoC exploring the following:
>> - if the reason set from the admin client is actually reaching the
>> member. in the case it is, i'd work on observing it in an interceptor and
>> handling the listener to pause the connection (this is feasible through
>> spring kafka's listener API). in the case it is not, would like to check
>> how server handles that (in this case i'd really appreciate your knowledge
>> to guide me on where in the code it happens)
>> - also check how spring kafka's .pause() method makes the member to get
>> slept
>> - maybe it would make sense to check kafka server code to see it would be
>> feasible to block the partition the disconnected member has been consuming
>> to be excluded from rebalance for this very case
>>
>> Once i have something on the PoC will let you know, hopefully during this
>> week. anyway if you can anticipate something about the roadmap above please
>> feel free to share your thoughts.
>>
>> Thanks,
>>
>>
>> El mar, 4 feb 2025 a las 15:30, Chia-Ping Tsai (<chia7...@gmail.com>)
>> escribió:
>>
>>> hi Armando
>>>
>>> Thanks for your question and contribution. Few questions are listed
>>> below.
>>> PTAL
>>>
>>> > I'm working in a java service that allows managing consumer group's
>>> members, specially to pause/resume them on demand.
>>>
>>> Out of curiosity, could you elaborate on how the
>>> removeMembersFromConsumerGroup method will manage dynamic members? Even
>>> after removing a dynamic member from the group, the consumer has the
>>> potential to rejoin the group at any time.
>>>
>>> > MemberToRemove class is using a
>>> static value "JoinGroupRequest.UNKNOWN_MEMBER_ID" for the memberId field
>>> that is being used in the member deletion logic, so it is expected that
>>> kafka service is responding such thing.
>>>
>>> I understand that MemberToRemove is designed to remove either a specific
>>> static member or "all" members from the group. Since dynamic members have
>>> randomly assigned IDs, I'm curious how you plan to pause a specific
>>> dynamic
>>> consumer. More specifically, how does the administrator obtain the
>>> (random)
>>> member ID? I assume this is achieved by comparing the clientId and host
>>> information returned by `describeConsumerGroups`?
>>>
>>> Best,
>>> Chia-Ping
>>>
>>>
>>>
>>> Armando Soriano <asoriano....@gmail.com> 於 2025年2月4日 週二 下午8:30寫道:
>>>
>>> > Hi there,
>>> >
>>> > I'm working in a java service that allows managing consumer group's
>>> > members, specially to pause/resume them on demand. After checking here
>>> and
>>> > there it seems that the actual AdminClient makes this feasible through
>>> > removeMembersFromConsumerGroup method [
>>> >
>>> >
>>> https://kafka.apache.org/38/javadoc/org/apache/kafka/clients/admin/KafkaAdminClient.html#removeMembersFromConsumerGroup(java.lang.String,org.apache.kafka.clients.admin.RemoveMembersFromConsumerGroupOptions)
>>> > ].
>>> > The point is that trying to implement the solution, found that somehow
>>> the
>>> > client was returning an error saying something like "unknown member is
>>> not
>>> > found". Jumped to the actual sources to find out the reasons and found
>>> that
>>> > actually the error was accurate, cause the MemberToRemove class is
>>> using a
>>> > static value "JoinGroupRequest.UNKNOWN_MEMBER_ID" for the memberId
>>> field
>>> > that is being used in the member deletion logic, so it is expected that
>>> > kafka service is responding such thing.
>>> >
>>> > As a PoC, I just updated the code to be able to set a given memberId
>>> value
>>> > (extracted from the describeConsumerGroups method) and it just worked
>>> as
>>> > expected. I created a PR https://github.com/apache/kafka/pull/18738
>>> as a
>>> > quick explanation to apply to ASF Jira. In that PR I was gently asked
>>> to
>>> > start the conversation through this mailing list, so here we are.
>>> >
>>> > I figured out that to make sure that members are not connecting again
>>> to
>>> > the consumer group, we'd require the listener (we're using
>>> spring-kafka) to
>>> > stop trying. Guess it could be done by pausing it from the service code
>>> > itself, by watching the reason the disconnection happened (this thing
>>> we
>>> > pass on removeMembersFromConsumerGroup). But everything lies on being
>>> able
>>> > to drop the members by setting the proper value as memberId from the
>>> admin
>>> > client.
>>> >
>>> > Do you think it makes sense to be able to handle the members like
>>> that? Are
>>> > there any security or kafka-wise service malfunction implications in
>>> > handling them that way?
>>> >
>>> > Thanks in advance,
>>> >
>>>
>>

Reply via email to