Hi Ngā mihi, I believe the log entry I included was from the underlying kafka-clients library given that the logger identified is “org.apache.kafka.clients.consumer.internals.ConsumerCoordinator”. I’ll admit at first I thought it also might be the fs2-kafka wrapper given that the 2.4.0 version is the first version that has correct support for the messaging from the ConsumerCoordinator. I am planning to do a test with the 3.0.0-M5 version which is built on the updated 3.1.0 kafka-clients library and will let the list know.
-Richard Ney Sent from my iPhone > On Mar 18, 2022, at 10:55 PM, Liam Clarke-Hutchinson <[email protected]> > wrote: > > Kia ora Richard, > > I see support for the Cooperative Sticky Assignor in fs2-kafka is quite > new. Have you discussed this issue with the community of that client at > all? I ask because I see on GitHub that fs2-kafka is using kafka-clients > 2.8.1 as the underlying client, and there's been a fair few bugfixes around > the cooperative sticky assignor since that version. > > Could you perhaps try overriding the kafka-clients dependency of fs2-kafka > and try a higher version, perhaps 3.1.0, and see if the issue remains? I'm > not sure how well that'll work, but might help narrow down the issue. > > Ngā mihi, > > Liam Clarke-Hutchinson > >> On Sat, 19 Mar 2022 at 14:24, Richard Ney <[email protected]> wrote: >> >> Thanks for the additional information Bruno. Does this look like a possible >> bug in the CooperativeStickyAssignor? I have 5 consumers reading from a 50 >> partition topic. Based on the log messages this application instance is >> only getting assigned 8 partitions but when I ask the consumer group for >> LAG information the consumer group thinks the correct number of 10 >> partitions were assigned but as should 2 partitions aren't getting read due >> to the application not knowing it has them. >> >> {"timestamp":"2022-03-19T00:54:46.025Z","message":"[Consumer >> instanceId=i-0e89c9bee06f71f68, >> clientId=consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68, >> groupId=app-query-platform-aoa-backfill-v7] Updating assignment with\n\t >> Assigned partitions: [ >> platform-data.appquery-platform.aoav3.backfill-28, >> platform-data.appquery-platform.aoav3.backfill-43, >> platform-data.appquery-platform.aoav3.backfill-31, >> platform-data.appquery-platform.aoav3.backfill-46, >> platform-data.appquery-platform.aoav3.backfill-34, >> platform-data.appquery-platform.aoav3.backfill-49, >> platform-data.appquery-platform.aoav3.backfill-40, >> platform-data.appquery-platform.aoav3.backfill-37] \n\t >> Current owned partitions: []\n\t >> >> Added partitions (assigned - owned): [ >> platform-data.appquery-platform.aoav3.backfill-28, >> platform-data.appquery-platform.aoav3.backfill-43, >> platform-data.appquery-platform.aoav3.backfill-31, >> platform-data.appquery-platform.aoav3.backfill-46, >> platform-data.appquery-platform.aoav3.backfill-34, >> platform-data.appquery-platform.aoav3.backfill-49, >> platform-data.appquery-platform.aoav3.backfill-40, >> platform-data.appquery-platform.aoav3.backfill-37]\n\t >> >> Revoked partitions (owned - assigned): >> >> []\n","logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"fs2-kafka-consumer-95","level":"INFO"} >> >> >> {"timestamp":"2022-03-19T00:54:46.026Z","message":"[Consumer >> instanceId=i-0e89c9bee06f71f68, >> clientId=consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68, >> groupId=app-query-platform-aoa-backfill-v7] >> >> Notifying assignor about the new Assignment(partitions=[ >> platform-data.appquery-platform.aoav3.backfill-28, >> platform-data.appquery-platform.aoav3.backfill-31, >> platform-data.appquery-platform.aoav3.backfill-34, >> platform-data.appquery-platform.aoav3.backfill-37, >> platform-data.appquery-platform.aoav3.backfill-40, >> platform-data.appquery-platform.aoav3.backfill-43, >> platform-data.appquery-platform.aoav3.backfill-46, >> >> platform-data.appquery-platform.aoav3.backfill-49])","logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"fs2-kafka-consumer-95","level":"INFO"} >> >> {"timestamp":"2022-03-19T00:54:46.028Z","message":"[Consumer >> instanceId=i-0e89c9bee06f71f68, >> clientId=consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68, >> groupId=app-query-platform-aoa-backfill-v7] >> >> Adding newly assigned partitions: >> platform-data.appquery-platform.aoav3.backfill-28, >> platform-data.appquery-platform.aoav3.backfill-43, >> platform-data.appquery-platform.aoav3.backfill-31, >> platform-data.appquery-platform.aoav3.backfill-46, >> platform-data.appquery-platform.aoav3.backfill-34, >> platform-data.appquery-platform.aoav3.backfill-49, >> platform-data.appquery-platform.aoav3.backfill-40, >> >> platform-data.appquery-platform.aoav3.backfill-37","logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"fs2-kafka-consumer-95","level":"INFO"} >> >> *OUTPUT FROM `/usr/local/opt/kafka/bin/kafka-consumer-groups`* >> >> GROUP TOPIC >> PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG >> CONSUMER-ID HOST >> CLIENT-ID >> app-query-platform-aoa-backfill-v7 >> platform-data.appquery-platform.aoav3.backfill 40 8369679 >> 8369696 17 >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /10.123.16.69 >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68 >> app-query-platform-aoa-backfill-v7 >> platform-data.appquery-platform.aoav3.backfill 37 8369643 >> 8369658 15 >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /10.123.16.69 >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68 >> app-query-platform-aoa-backfill-v7 >> platform-data.appquery-platform.aoav3.backfill 46 8368044 >> 8368055 11 >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /10.123.16.69 >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68 >> app-query-platform-aoa-backfill-v7 >> platform-data.appquery-platform.aoav3.backfill 34 8379346 >> 8379358 12 >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /10.123.16.69 >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68 >> app-query-platform-aoa-backfill-v7 >> platform-data.appquery-platform.aoav3.backfill 28 8374244 >> 8374247 3 >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /10.123.16.69 >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68 >> app-query-platform-aoa-backfill-v7 >> platform-data.appquery-platform.aoav3.backfill 49 8364656 >> 8364665 9 >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /10.123.16.69 >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68 >> app-query-platform-aoa-backfill-v7 >> platform-data.appquery-platform.aoav3.backfill 43 8369980 >> 8369988 8 >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /10.123.16.69 >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68 >> app-query-platform-aoa-backfill-v7 >> platform-data.appquery-platform.aoav3.backfill 25 8369261 >> 8370063 802 >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /10.123.16.69 >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68 >> app-query-platform-aoa-backfill-v7 >> platform-data.appquery-platform.aoav3.backfill 31 8368087 >> 8368097 10 >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /10.123.16.69 >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68 >> app-query-platform-aoa-backfill-v7 >> platform-data.appquery-platform.aoav3.backfill 22 8370475 >> 8371319 844 >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /10.123.16.69 >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68 >> >>> On Thu, Mar 17, 2022 at 2:29 AM Bruno Cadonna <[email protected]> wrote: >>> >>> Hi Richard, >>> >>> The group.instance.id config is orthogonal to the partition assignment >>> strategy. The group.instance.id is used if you want to have static >>> membership which is not related to the partition assignment strategy. >>> >>> If you think you found a bug, could you please open a JIRA ticket with >>> steps to reproduce the bug. >>> >>> Best, >>> Bruno >>> >>> On 16.03.22 10:01, Luke Chen wrote: >>>> Hi Richard, >>>> >>>> Right, you are not missing any settings beyond the partition assignment >>>> strategy and the group instance id. >>>> You might need to know from the log that why the rebalance triggered to >>> do >>>> troubleshooting. >>>> >>>> Thank you. >>>> Luke >>>> >>>> On Wed, Mar 16, 2022 at 3:02 PM Richard Ney <[email protected]> >>> wrote: >>>> >>>>> Hi Luke, >>>>> >>>>> I did end up with a situation where I had two instances connecting to >>> the >>>>> same consumer group and they ended up in a rebalance trade-off. All >>>>> partitions kept going back and forth between the two microservice >>>>> instances. That was a test case where I'd removed the Group Instance >> Id >>>>> setting to see what would happen. I stabilized that one by reducing it >>> to a >>>>> single consumer after 20+ rebalances. >>>>> >>>>> The other issue I'm seeing may be a bug in the Functional Scala >>> `fs2-kafka` >>>>> wrapper where I see the partitions cleanly assigned but one or more >>>>> instances isn't ingesting. I found out that they recently added >> support >>> for >>>>> the cooperative sticky assignor for the stream recreation since they >>> were >>>>> assuming a full revocation of the partitions. >>>>> >>>>> So I basically wanted to make sure I wasn't missing any settings >> beyond >>> the >>>>> partition assignment strategy and the group instance id. >>>>> >>>>> -Richard >>>>> >>>>> -Richard >>>>> >>>>> On Tue, Mar 15, 2022 at 11:27 PM Luke Chen <[email protected]> wrote: >>>>> >>>>>> Hi Richard, >>>>>> >>>>>> To use `CooperativeStickyAssignor`, no other special configuration is >>>>>> required. >>>>>> >>>>>> I'm not sure what does `make the rebalance happen cleanly` mean. >>>>>> Did you find any problem during group rebalance? >>>>>> >>>>>> Thank you. >>>>>> Luke >>>>>> >>>>>> On Wed, Mar 16, 2022 at 1:00 PM Richard Ney <[email protected] >>>>>> .invalid> >>>>>> wrote: >>>>>> >>>>>>> Trying to find a good sample of what consumer settings besides >> setting >>>>>>> >>>>>>> ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG to >>>>>>> org.apache.kafka.clients.consumer.CooperativeStickyAssignor >>>>>>> >>>>>>> is needed to make the rebalance happen cleanly. Unable to find and >>>>> decent >>>>>>> documentation or code samples. I have set the Group Instance Id to >> the >>>>>> EC2 >>>>>>> instance id based on one blog write up I found. >>>>>>> >>>>>>> Any help would be appreciated >>>>>>> >>>>>>> -Richard >>>>>>> >>>>>> >>>>> >>>> >>> >>
