I am using the kafka-clients through the fs2-kafka wrapper. Thou the log
message I posted and copied again here

Notifying assignor about the new Assignment(partitions=[
platform-data.appquery-platform.aoav3.backfill-28,
platform-data.appquery-platform.aoav3.backfill-31,
platform-data.appquery-platform.aoav3.backfill-34,
platform-data.appquery-platform.aoav3.backfill-37,
platform-data.appquery-platform.aoav3.backfill-40,
platform-data.appquery-platform.aoav3.backfill-43,
platform-data.appquery-platform.aoav3.backfill-46,
platform-data.appquery-platform.aoav3.backfill-49])","logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"fs2-kafka-consumer-95","level":"INFO"}

is not generated by the fs2-kafka wrapper. Based on the kafka-client code;
these are the log messages generated during the initial partition
assignment when my application connects to the Kafka brokers. If this list
contained the full list of partitions listed in the kafka-consumer-groups
output but the lag was increasing on 2 partitions, I would immediately
suspect the fs2-kafka wrapper as the issue. The fact that the notification
messages from the kafka-clients library to the fs2-kafka library are
missing two partitions makes me suspect the issue is in the kafka-clients
library. In this occurrence this happened on 2 of the 5 consumer instances.
The version of the kafka-clients library used by the version of the
fs2-kafka library for this test is *2.8.1*. I'm currently running another
test with the latest fs2-kafka library which is consuming the *3.1.0*
version of the kafka-clients library. Initial partition assignment was
successful. On Monday I'll do a large number of scale-up/scale-down tests
to force rebalancing of partitions to see if I can replicate the issue
using the latest version.

On Sat, Mar 19, 2022 at 2:06 AM Liam Clarke-Hutchinson <lclar...@redhat.com>
wrote:

> So to clarify, you're using kafka-clients directly? Or via fx2-kafka? If
> it's kafka-clients directly, what version please?
>
> On Sat, 19 Mar 2022 at 19:59, Richard Ney <kamisama....@gmail.com> wrote:
>
> > Hi Liam,
> >
> > Sorry for the mis-identification in the last email. The fun of answering
> an
> > email on a phone instead of a desktop. I confirmed the upper log
> messages I
> > included in the message come from this location in the `kafka-clients`
> > library
> >
> >
> >
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L422
> >
> > And it's only including 8 of the 10 partitions that were assigned to that
> > consumer instance.
> >
> > -Richard
> >
> > On Fri, Mar 18, 2022 at 11:15 PM Richard Ney <kamisama....@gmail.com>
> > wrote:
> >
> > > Hi Ngā mihi,
> > >
> > > I believe the log entry I included was from the underlying
> kafka-clients
> > > library given that the logger identified is
> > > “org.apache.kafka.clients.consumer.internals.ConsumerCoordinator”. I’ll
> > > admit at first I thought it also might be the fs2-kafka wrapper given
> > that
> > > the 2.4.0 version is the first version that has correct support for the
> > > messaging from the ConsumerCoordinator. I am planning to do a test with
> > the
> > > 3.0.0-M5 version which is built on the updated 3.1.0 kafka-clients
> > library
> > > and will let the list know.
> > >
> > > -Richard Ney
> > >
> > > Sent from my iPhone
> > >
> > > > On Mar 18, 2022, at 10:55 PM, Liam Clarke-Hutchinson <
> > > lclar...@redhat.com> wrote:
> > > >
> > > > Kia ora Richard,
> > > >
> > > > I see support for the Cooperative Sticky Assignor in fs2-kafka is
> quite
> > > > new. Have you discussed this issue with the community of that client
> at
> > > > all? I ask because I see on GitHub that fs2-kafka is using
> > kafka-clients
> > > > 2.8.1 as the underlying client, and there's been a fair few bugfixes
> > > around
> > > > the cooperative sticky assignor since that version.
> > > >
> > > > Could you perhaps try overriding the kafka-clients dependency of
> > > fs2-kafka
> > > > and try a higher version, perhaps 3.1.0, and see if the issue
> remains?
> > > I'm
> > > > not sure how well that'll work, but might help narrow down the issue.
> > > >
> > > > Ngā mihi,
> > > >
> > > > Liam Clarke-Hutchinson
> > > >
> > > >> On Sat, 19 Mar 2022 at 14:24, Richard Ney <kamisama....@gmail.com>
> > > wrote:
> > > >>
> > > >> Thanks for the additional information Bruno. Does this look like a
> > > possible
> > > >> bug in the CooperativeStickyAssignor? I have 5 consumers reading
> from
> > a
> > > 50
> > > >> partition topic. Based on the log messages this application instance
> > is
> > > >> only getting assigned 8 partitions but when I ask the consumer group
> > for
> > > >> LAG information the consumer group thinks the correct number of 10
> > > >> partitions were assigned but as should 2 partitions aren't getting
> > read
> > > due
> > > >> to the application not knowing it has them.
> > > >>
> > > >> {"timestamp":"2022-03-19T00:54:46.025Z","message":"[Consumer
> > > >> instanceId=i-0e89c9bee06f71f68,
> > > >>
> > >
> clientId=consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68,
> > > >> groupId=app-query-platform-aoa-backfill-v7] Updating assignment
> > with\n\t
> > > >> Assigned partitions: [
> > > >> platform-data.appquery-platform.aoav3.backfill-28,
> > > >> platform-data.appquery-platform.aoav3.backfill-43,
> > > >> platform-data.appquery-platform.aoav3.backfill-31,
> > > >> platform-data.appquery-platform.aoav3.backfill-46,
> > > >> platform-data.appquery-platform.aoav3.backfill-34,
> > > >> platform-data.appquery-platform.aoav3.backfill-49,
> > > >> platform-data.appquery-platform.aoav3.backfill-40,
> > > >> platform-data.appquery-platform.aoav3.backfill-37] \n\t
> > > >> Current owned partitions:                  []\n\t
> > > >>
> > > >> Added partitions (assigned - owned):       [
> > > >> platform-data.appquery-platform.aoav3.backfill-28,
> > > >> platform-data.appquery-platform.aoav3.backfill-43,
> > > >> platform-data.appquery-platform.aoav3.backfill-31,
> > > >> platform-data.appquery-platform.aoav3.backfill-46,
> > > >> platform-data.appquery-platform.aoav3.backfill-34,
> > > >> platform-data.appquery-platform.aoav3.backfill-49,
> > > >> platform-data.appquery-platform.aoav3.backfill-40,
> > > >> platform-data.appquery-platform.aoav3.backfill-37]\n\t
> > > >>
> > > >> Revoked partitions (owned - assigned):
> > > >>
> > > >>
> > >
> >
> []\n","logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"fs2-kafka-consumer-95","level":"INFO"}
> > > >>
> > > >>
> > > >> {"timestamp":"2022-03-19T00:54:46.026Z","message":"[Consumer
> > > >> instanceId=i-0e89c9bee06f71f68,
> > > >>
> > >
> clientId=consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68,
> > > >> groupId=app-query-platform-aoa-backfill-v7]
> > > >>
> > > >> Notifying assignor about the new Assignment(partitions=[
> > > >> platform-data.appquery-platform.aoav3.backfill-28,
> > > >> platform-data.appquery-platform.aoav3.backfill-31,
> > > >> platform-data.appquery-platform.aoav3.backfill-34,
> > > >> platform-data.appquery-platform.aoav3.backfill-37,
> > > >> platform-data.appquery-platform.aoav3.backfill-40,
> > > >> platform-data.appquery-platform.aoav3.backfill-43,
> > > >> platform-data.appquery-platform.aoav3.backfill-46,
> > > >>
> > > >>
> > >
> >
> platform-data.appquery-platform.aoav3.backfill-49])","logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"fs2-kafka-consumer-95","level":"INFO"}
> > > >>
> > > >> {"timestamp":"2022-03-19T00:54:46.028Z","message":"[Consumer
> > > >> instanceId=i-0e89c9bee06f71f68,
> > > >>
> > >
> clientId=consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68,
> > > >> groupId=app-query-platform-aoa-backfill-v7]
> > > >>
> > > >> Adding newly assigned partitions:
> > > >> platform-data.appquery-platform.aoav3.backfill-28,
> > > >> platform-data.appquery-platform.aoav3.backfill-43,
> > > >> platform-data.appquery-platform.aoav3.backfill-31,
> > > >> platform-data.appquery-platform.aoav3.backfill-46,
> > > >> platform-data.appquery-platform.aoav3.backfill-34,
> > > >> platform-data.appquery-platform.aoav3.backfill-49,
> > > >> platform-data.appquery-platform.aoav3.backfill-40,
> > > >>
> > > >>
> > >
> >
> platform-data.appquery-platform.aoav3.backfill-37","logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"fs2-kafka-consumer-95","level":"INFO"}
> > > >>
> > > >> *OUTPUT FROM `/usr/local/opt/kafka/bin/kafka-consumer-groups`*
> > > >>
> > > >> GROUP                              TOPIC
> > > >>       PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
> > > >> CONSUMER-ID                                              HOST
> > > >> CLIENT-ID
> > > >> app-query-platform-aoa-backfill-v7
> > > >> platform-data.appquery-platform.aoav3.backfill 40         8369679
> > > >> 8369696         17
> > > >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /
> > 10.123.16.69
> > > >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> > > >> app-query-platform-aoa-backfill-v7
> > > >> platform-data.appquery-platform.aoav3.backfill 37         8369643
> > > >> 8369658         15
> > > >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /
> > 10.123.16.69
> > > >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> > > >> app-query-platform-aoa-backfill-v7
> > > >> platform-data.appquery-platform.aoav3.backfill 46         8368044
> > > >> 8368055         11
> > > >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /
> > 10.123.16.69
> > > >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> > > >> app-query-platform-aoa-backfill-v7
> > > >> platform-data.appquery-platform.aoav3.backfill 34         8379346
> > > >> 8379358         12
> > > >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /
> > 10.123.16.69
> > > >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> > > >> app-query-platform-aoa-backfill-v7
> > > >> platform-data.appquery-platform.aoav3.backfill 28         8374244
> > > >> 8374247         3
> > > >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /
> > 10.123.16.69
> > > >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> > > >> app-query-platform-aoa-backfill-v7
> > > >> platform-data.appquery-platform.aoav3.backfill 49         8364656
> > > >> 8364665         9
> > > >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /
> > 10.123.16.69
> > > >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> > > >> app-query-platform-aoa-backfill-v7
> > > >> platform-data.appquery-platform.aoav3.backfill 43         8369980
> > > >> 8369988         8
> > > >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /
> > 10.123.16.69
> > > >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> > > >> app-query-platform-aoa-backfill-v7
> > > >> platform-data.appquery-platform.aoav3.backfill 25         8369261
> > > >> 8370063         802
> > > >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /
> > 10.123.16.69
> > > >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> > > >> app-query-platform-aoa-backfill-v7
> > > >> platform-data.appquery-platform.aoav3.backfill 31         8368087
> > > >> 8368097         10
> > > >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /
> > 10.123.16.69
> > > >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> > > >> app-query-platform-aoa-backfill-v7
> > > >> platform-data.appquery-platform.aoav3.backfill 22         8370475
> > > >> 8371319         844
> > > >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /
> > 10.123.16.69
> > > >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> > > >>
> > > >>> On Thu, Mar 17, 2022 at 2:29 AM Bruno Cadonna <cado...@apache.org>
> > > wrote:
> > > >>>
> > > >>> Hi Richard,
> > > >>>
> > > >>> The group.instance.id config is orthogonal to the partition
> > assignment
> > > >>> strategy. The group.instance.id is used if you want to have static
> > > >>> membership which is not related to the partition assignment
> strategy.
> > > >>>
> > > >>> If you think you found a bug, could you please open a JIRA ticket
> > with
> > > >>> steps to reproduce the bug.
> > > >>>
> > > >>> Best,
> > > >>> Bruno
> > > >>>
> > > >>> On 16.03.22 10:01, Luke Chen wrote:
> > > >>>> Hi Richard,
> > > >>>>
> > > >>>> Right, you are not missing any settings beyond the partition
> > > assignment
> > > >>>> strategy and the group instance id.
> > > >>>> You might need to know from the log that why the rebalance
> triggered
> > > to
> > > >>> do
> > > >>>> troubleshooting.
> > > >>>>
> > > >>>> Thank you.
> > > >>>> Luke
> > > >>>>
> > > >>>> On Wed, Mar 16, 2022 at 3:02 PM Richard Ney <
> kamisama....@gmail.com
> > >
> > > >>> wrote:
> > > >>>>
> > > >>>>> Hi Luke,
> > > >>>>>
> > > >>>>> I did end up with a situation where I had two instances
> connecting
> > to
> > > >>> the
> > > >>>>> same consumer group and they ended up in a rebalance trade-off.
> All
> > > >>>>> partitions kept going back and forth between the two microservice
> > > >>>>> instances. That was a test case where I'd removed the Group
> > Instance
> > > >> Id
> > > >>>>> setting to see what would happen. I stabilized that one by
> reducing
> > > it
> > > >>> to a
> > > >>>>> single consumer after 20+ rebalances.
> > > >>>>>
> > > >>>>> The other issue I'm seeing may be a bug in the Functional Scala
> > > >>> `fs2-kafka`
> > > >>>>> wrapper where I see the partitions cleanly assigned but one or
> more
> > > >>>>> instances isn't ingesting. I found out that they recently added
> > > >> support
> > > >>> for
> > > >>>>> the cooperative sticky assignor for the stream recreation since
> > they
> > > >>> were
> > > >>>>> assuming a full revocation of the partitions.
> > > >>>>>
> > > >>>>> So I basically wanted to make sure I wasn't missing any settings
> > > >> beyond
> > > >>> the
> > > >>>>> partition assignment strategy and the group instance id.
> > > >>>>>
> > > >>>>> -Richard
> > > >>>>>
> > > >>>>> -Richard
> > > >>>>>
> > > >>>>> On Tue, Mar 15, 2022 at 11:27 PM Luke Chen <show...@gmail.com>
> > > wrote:
> > > >>>>>
> > > >>>>>> Hi Richard,
> > > >>>>>>
> > > >>>>>> To use `CooperativeStickyAssignor`, no other special
> configuration
> > > is
> > > >>>>>> required.
> > > >>>>>>
> > > >>>>>> I'm not sure what does `make the rebalance happen cleanly` mean.
> > > >>>>>> Did you find any problem during group rebalance?
> > > >>>>>>
> > > >>>>>> Thank you.
> > > >>>>>> Luke
> > > >>>>>>
> > > >>>>>> On Wed, Mar 16, 2022 at 1:00 PM Richard Ney <
> > > richard....@lookout.com
> > > >>>>>> .invalid>
> > > >>>>>> wrote:
> > > >>>>>>
> > > >>>>>>> Trying to find a good sample of what consumer settings besides
> > > >> setting
> > > >>>>>>>
> > > >>>>>>> ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG to
> > > >>>>>>> org.apache.kafka.clients.consumer.CooperativeStickyAssignor
> > > >>>>>>>
> > > >>>>>>> is needed to make the rebalance happen cleanly. Unable to find
> > and
> > > >>>>> decent
> > > >>>>>>> documentation or code samples. I have set the Group Instance Id
> > to
> > > >> the
> > > >>>>>> EC2
> > > >>>>>>> instance id based on one blog write up I found.
> > > >>>>>>>
> > > >>>>>>> Any help would be appreciated
> > > >>>>>>>
> > > >>>>>>> -Richard
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>
> > > >>>
> > > >>
> > >
> >
>

Reply via email to