Hi Tsai,

Thanks for your quick response and sorry for the late reply, i'm involved
in some other investigations that are consuming my time, but i'm going to
move forward on this one from now on.

Your questions are really pointing the spot though, and guess i kinda
anticipated them in the last piece of the first email:

>> I figured out that to make sure that members are not connecting again to
the consumer group, we'd require the listener (we're using spring-kafka) to
stop trying. Guess it could be done by pausing it from the service code
itself, by watching the reason the disconnection happened (this thing we
pass on removeMembersFromConsumerGroup). But everything lies on being able
to drop the members by setting the proper value as memberId from the admin
client.

So, i'm going to work in a PoC exploring the following:
- if the reason set from the admin client is actually reaching the member.
in the case it is, i'd work on observing it in an interceptor and handling
the listener to pause the connection (this is feasible through spring
kafka's listener API). in the case it is not, would like to check how
server handles that (in this case i'd really appreciate your knowledge to
guide me on where in the code it happens)
- also check how spring kafka's .pause() method makes the member to get
slept
- maybe it would make sense to check kafka server code to see it would be
feasible to block the partition the disconnected member has been consuming
to be excluded from rebalance for this very case

Once i have something on the PoC will let you know, hopefully during this
week. anyway if you can anticipate something about the roadmap above please
feel free to share your thoughts.

Thanks,


El mar, 4 feb 2025 a las 15:30, Chia-Ping Tsai (<chia7...@gmail.com>)
escribió:

> hi Armando
>
> Thanks for your question and contribution. Few questions are listed below.
> PTAL
>
> > I'm working in a java service that allows managing consumer group's
> members, specially to pause/resume them on demand.
>
> Out of curiosity, could you elaborate on how the
> removeMembersFromConsumerGroup method will manage dynamic members? Even
> after removing a dynamic member from the group, the consumer has the
> potential to rejoin the group at any time.
>
> > MemberToRemove class is using a
> static value "JoinGroupRequest.UNKNOWN_MEMBER_ID" for the memberId field
> that is being used in the member deletion logic, so it is expected that
> kafka service is responding such thing.
>
> I understand that MemberToRemove is designed to remove either a specific
> static member or "all" members from the group. Since dynamic members have
> randomly assigned IDs, I'm curious how you plan to pause a specific dynamic
> consumer. More specifically, how does the administrator obtain the (random)
> member ID? I assume this is achieved by comparing the clientId and host
> information returned by `describeConsumerGroups`?
>
> Best,
> Chia-Ping
>
>
>
> Armando Soriano <asoriano....@gmail.com> 於 2025年2月4日 週二 下午8:30寫道:
>
> > Hi there,
> >
> > I'm working in a java service that allows managing consumer group's
> > members, specially to pause/resume them on demand. After checking here
> and
> > there it seems that the actual AdminClient makes this feasible through
> > removeMembersFromConsumerGroup method [
> >
> >
> https://kafka.apache.org/38/javadoc/org/apache/kafka/clients/admin/KafkaAdminClient.html#removeMembersFromConsumerGroup(java.lang.String,org.apache.kafka.clients.admin.RemoveMembersFromConsumerGroupOptions)
> > ].
> > The point is that trying to implement the solution, found that somehow
> the
> > client was returning an error saying something like "unknown member is
> not
> > found". Jumped to the actual sources to find out the reasons and found
> that
> > actually the error was accurate, cause the MemberToRemove class is using
> a
> > static value "JoinGroupRequest.UNKNOWN_MEMBER_ID" for the memberId field
> > that is being used in the member deletion logic, so it is expected that
> > kafka service is responding such thing.
> >
> > As a PoC, I just updated the code to be able to set a given memberId
> value
> > (extracted from the describeConsumerGroups method) and it just worked as
> > expected. I created a PR https://github.com/apache/kafka/pull/18738 as a
> > quick explanation to apply to ASF Jira. In that PR I was gently asked to
> > start the conversation through this mailing list, so here we are.
> >
> > I figured out that to make sure that members are not connecting again to
> > the consumer group, we'd require the listener (we're using spring-kafka)
> to
> > stop trying. Guess it could be done by pausing it from the service code
> > itself, by watching the reason the disconnection happened (this thing we
> > pass on removeMembersFromConsumerGroup). But everything lies on being
> able
> > to drop the members by setting the proper value as memberId from the
> admin
> > client.
> >
> > Do you think it makes sense to be able to handle the members like that?
> Are
> > there any security or kafka-wise service malfunction implications in
> > handling them that way?
> >
> > Thanks in advance,
> >
>

Reply via email to