Thank you Luke, it makes sense.

I have made the update on my application. Thanks all for your feedback!

On 2021/06/24 02:26:49, Luke Chen <show...@gmail.com> wrote: 
> Hi Tao,
> The Round-Robin assignor is OK, for sure.
> But since the *StickyAssignor* doesn't get affected by this bug, I'd
> suggest you use it. After all, the StickyAssignor will have better
> performance because it preserves the existing assignments as much as
> possible to reduce the overheads to re-assign the topic partitions/tasks.
> 
> Thank you.
> Luke
> 
> On Thu, Jun 24, 2021 at 10:13 AM Tao Huang <sandy.huang...@gmail.com> wrote:
> 
> > Thank you Sophie and Luke for the confirmation.
> >
> > @Luke, the reason I think the assignor strategy may not play an important
> > role in my application is that,  my application workflow does not rely on
> > partition assigned, what it does is just to poll the event and process the
> > payload without any stickiness to the assigned partition, and use
> > auto-commit for "at most once" event consumption. It does not track the
> > offset or any other meta data of partitions it works on. For such pattern,
> > I think I may use Round-Robin strategy. Any suggestions?
> >
> > Thanks!
> >
> > On 2021/06/24 00:15:57, Luke Chen <show...@gmail.com> wrote:
> > > Hi Sophie,
> > > Thanks for your clarification. :)
> > >
> > > Luke
> > >
> > > Sophie Blee-Goldman <sop...@confluent.io.invalid> 於 2021年6月24日 週四
> > 上午8:00 寫道:
> > >
> > > > Just to clarify, this bug actually does impact only the
> > cooperative-sticky
> > > > assignor. The cooperative sticky assignor gets its
> > > > "ownedPartitions" input from the (possibly corrupted)
> > SubscriptionState,
> > > > while the plain sticky assignor has to rely on
> > > > keeping track of these partitions itself, since in eager rebalancing
> > the
> > > > "ownedPartitions" are always empty during a rebalance.
> > > > So you can safely use the regular sticky assignor to avoid this issue.
> > > >
> > > > On Wed, Jun 23, 2021 at 4:38 PM Luke Chen <show...@gmail.com> wrote:
> > > >
> > > > > Hi Tao,
> > > > > 1. So this bug only applies to cooperative-sticky assignor?
> > > > > --> Yes, this bug only applies to sticky assignor (both eager and
> > > > > cooperative) since we will refer to the consumer's previous
> > assignment.
> > > > >
> > > > > 2. Does assignor strategy (cooperative-sticky vs sticky vs others)
> > really
> > > > > matter in this case?
> > > > > --> No, the assignor strategy won't affect the at most once. They are
> > > > > independent concepts.
> > > > >
> > > > > That is, to workaround this issue, please change to a non-sticky
> > assignor
> > > > > before the bug fixed.
> > > > >
> > > > > Thank you.
> > > > > Luke
> > > > >
> > > > > Tao Huang <sandy.huang...@gmail.com> 於 2021年6月23日 週三 下午9:34 寫道:
> > > > >
> > > > > > Thank you Sophie for sharing the details.
> > > > > >
> > > > > > So this bug only applies to cooperative-sticky assignor? Should I
> > > > switch
> > > > > > to other strategy (eg: StickyAssignor) while I am waiting for the
> > fix?
> > > > > >
> > > > > > On the other hand, my application is using "auto-commit" mechanism
> > for
> > > > > "at
> > > > > > most once" event consuming. Does assignor strategy
> > (cooperative-sticky
> > > > vs
> > > > > > sticky vs others) really matter in this case? My understanding is
> > that,
> > > > > > regardless which strategy is used, the members in the group have to
> > > > > rejoin
> > > > > > when re-balance happens.
> > > > > >
> > > > > > Thanks!
> > > > > >
> > > > > > Tao
> > > > > >
> > > > > > On 2021/06/23 02:01:04, Sophie Blee-Goldman
> > > > <sop...@confluent.io.INVALID
> > > > > >
> > > > > > wrote:
> > > > > > > Here's the ticket:
> > https://issues.apache.org/jira/browse/KAFKA-12984
> > > > > > >
> > > > > > > And the root cause of that itself:
> > > > > > > https://issues.apache.org/jira/browse/KAFKA-12983
> > > > > > >
> > > > > > > On Tue, Jun 22, 2021 at 6:15 PM Sophie Blee-Goldman <
> > > > > sop...@confluent.io
> > > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hey Tao,
> > > > > > > >
> > > > > > > > We recently discovered a bug in the way that the consumer
> > tracks
> > > > > > partition
> > > > > > > > metadata which may cause the cooperative-sticky assignor to
> > throw
> > > > > this
> > > > > > > > exception in the case of a consumer that dropped out of the
> > group
> > > > at
> > > > > > some
> > > > > > > > point. I'm just about to file a ticket for it, and it should be
> > > > fixed
> > > > > > in
> > > > > > > > the upcoming releases.
> > > > > > > >
> > > > > > > > The problem is that some consumers are claiming to own
> > partitions
> > > > > that
> > > > > > > > they no longer actually own after having dropped out. If you
> > can
> > > > > narrow
> > > > > > > > down the problematic consumers and restart them, it should
> > resolve
> > > > > the
> > > > > > > > issue. I believe you should be able to tell which consumers are
> > > > > > claiming
> > > > > > > > partitions they no longer own based on the logs, but another
> > option
> > > > > is
> > > > > > just
> > > > > > > > to restart all the consumers (or do a rolling restart until the
> > > > > problem
> > > > > > > > goes away).
> > > > > > > >
> > > > > > > > I'll follow up here with the ticket link once I've filed it.
> > > > > > > >
> > > > > > > > -Sophie
> > > > > > > >
> > > > > > > > On Tue, Jun 22, 2021 at 12:07 PM Tao Huang <
> > > > sandy.huang...@gmail.com
> > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > >> Thanks for the feedback.
> > > > > > > >>
> > > > > > > >> It seems the referred bug is on the server (Broker) side? I
> > just
> > > > > > checked
> > > > > > > >> my Kafka Broker version, it is actually on 2.4.1. So the bug
> > seems
> > > > > > does not
> > > > > > > >> apply to my case.
> > > > > > > >>
> > > > > > > >> Should I downgrade my client (Java library) version to 2.4.1?
> > > > > > > >>
> > > > > > > >> Thanks!
> > > > > > > >>
> > > > > > > >> On 2021/06/21 20:04:31, Ran Lupovich <ranlupov...@gmail.com>
> > > > wrote:
> > > > > > > >> >
> > > > > >
> > > >
> > https://issues.apache.org/jira/plugins/servlet/mobile#issue/KAFKA-12890
> > > > > > > >> >
> > > > > > > >> > Check out this jira ticket
> > > > > > > >> >
> > > > > > > >> > בתאריך יום ב׳, 21 ביוני 2021, 22:15, מאת Tao Huang ‏<
> > > > > > > >> > sandy.huang...@gmail.com>:
> > > > > > > >> >
> > > > > > > >> > > Hi There,
> > > > > > > >> > >
> > > > > > > >> > > I am experiencing intermittent issue when consumer group
> > stuck
> > > > > on
> > > > > > > >> > > "Completing-Reblalance" state. When this is happening,
> > client
> > > > > > throws
> > > > > > > >> error
> > > > > > > >> > > as below:
> > > > > > > >> > >
> > > > > > > >> > > 2021-06-18 13:55:41,086 ERROR
> > io.mylab.adapter.KafkaListener
> > > > > > > >> > >
> > [edfKafkaListener:CIO.PandC.CIPG.InternalLoggingMetadataInfo]
> > > > > > > >> Exception on
> > > > > > > >> > > Kafka listener (InternalLoggingMetadataInfo) - Some
> > partitions
> > > > > are
> > > > > > > >> > > unassigned but all consumers are at maximum capacity
> > > > > > > >> > > java.lang.IllegalStateException: Some partitions are
> > > > unassigned
> > > > > > but
> > > > > > > >> all
> > > > > > > >> > > consumers are at maximum capacity
> > > > > > > >> > > at
> > > > > > > >> > >
> > > > > > > >> > >
> > > > > > > >>
> > > > > >
> > > > >
> > > >
> > org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.constrainedAssign(AbstractStickyAssignor.java:248)
> > > > > > > >> > > at
> > > > > > > >> > >
> > > > > > > >> > >
> > > > > > > >>
> > > > > >
> > > > >
> > > >
> > org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.assign(AbstractStickyAssignor.java:81)
> > > > > > > >> > > at
> > > > > > > >> > >
> > > > > > > >> > >
> > > > > > > >>
> > > > > >
> > > > >
> > > >
> > org.apache.kafka.clients.consumer.CooperativeStickyAssignor.assign(CooperativeStickyAssignor.java:64)
> > > > > > > >> > > at
> > > > > > > >> > >
> > > > > > > >> > >
> > > > > > > >>
> > > > > >
> > > > >
> > > >
> > org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor.assign(AbstractPartitionAssignor.java:66)
> > > > > > > >> > > at
> > > > > > > >> > >
> > > > > > > >> > >
> > > > > > > >>
> > > > > >
> > > > >
> > > >
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:589)
> > > > > > > >> > > at
> > > > > > > >> > >
> > > > > > > >> > >
> > > > > > > >>
> > > > > >
> > > > >
> > > >
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:686)
> > > > > > > >> > > at
> > > > > > > >> > >
> > > > > > > >> > >
> > > > > > > >>
> > > > > >
> > > > >
> > > >
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1100(AbstractCoordinator.java:111)
> > > > > > > >> > > at
> > > > > > > >> > >
> > > > > > > >> > >
> > > > > > > >>
> > > > > >
> > > > >
> > > >
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:599)
> > > > > > > >> > > at
> > > > > > > >> > >
> > > > > > > >> > >
> > > > > > > >>
> > > > > >
> > > > >
> > > >
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:562)
> > > > > > > >> > > at
> > > > > > > >> > >
> > > > > > > >> > >
> > > > > > > >>
> > > > > >
> > > > >
> > > >
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1151)
> > > > > > > >> > > at
> > > > > > > >> > >
> > > > > > > >> > >
> > > > > > > >>
> > > > > >
> > > > >
> > > >
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1126)
> > > > > > > >> > > at
> > > > > > > >> > >
> > > > > > > >> > >
> > > > > > > >>
> > > > > >
> > > > >
> > > >
> > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206)
> > > > > > > >> > > at
> > > > > > > >> > >
> > > > > > > >> > >
> > > > > > > >>
> > > > > >
> > > > >
> > > >
> > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
> > > > > > > >> > > at
> > > > > > > >> > >
> > > > > > > >> > >
> > > > > > > >>
> > > > > >
> > > > >
> > > >
> > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
> > > > > > > >> > > at
> > > > > > > >> > >
> > > > > > > >> > >
> > > > > > > >>
> > > > > >
> > > > >
> > > >
> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602)
> > > > > > > >> > > at
> > > > > > > >> > >
> > > > > > > >> > >
> > > > > > > >>
> > > > > >
> > > > >
> > > >
> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412)
> > > > > > > >> > > at
> > > > > > > >> > >
> > > > > > > >> > >
> > > > > > > >>
> > > > > >
> > > > >
> > > >
> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
> > > > > > > >> > > at
> > > > > > > >> > >
> > > > > > > >> > >
> > > > > > > >>
> > > > > >
> > > > >
> > > >
> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
> > > > > > > >> > > at
> > > > > > > >> > >
> > > > > > > >> > >
> > > > > > > >>
> > > > > >
> > > > >
> > > >
> > org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1296)
> > > > > > > >> > > at
> > > > > > > >> > >
> > > > > > > >> > >
> > > > > > > >>
> > > > > >
> > > > >
> > > >
> > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237)
> > > > > > > >> > > at
> > > > > > > >> > >
> > > > > > > >> > >
> > > > > > > >>
> > > > > >
> > > > >
> > > >
> > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
> > > > > > > >> > > at
> > > > io.mylab.adapter.KafkaListener.run(EdfKafkaListener.java:93)
> > > > > > > >> > > at java.lang.Thread.run(Thread.java:748)
> > > > > > > >> > >
> > > > > > > >> > > The option to exit the state is to stop some members of
> > the
> > > > > > consumer
> > > > > > > >> group.
> > > > > > > >> > >
> > > > > > > >> > > Version: 2.6.1
> > > > > > > >> > > PARTITION_ASSIGNMENT_STRATEGY: CooperativeStickyAssignor
> > > > > > > >> > >
> > > > > > > >> > > Would you please advise what would be the condition to
> > trigger
> > > > > > such
> > > > > > > >> issue?
> > > > > > > >> > >
> > > > > > > >> > > Thanks!
> > > > > > > >> > >
> > > > > > > >> >
> > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> 

Reply via email to