Hi Liam,

I've gotten the cooperative sticky assignor to work with the latest
fs2-kafka wrapper. There was a bug in my code where the `.parJoinUnbounded`
which processes the streams needed to move out 1 scope of execution to pull
in the notification message stream. It's possible that the 2.4.0 version of
the f2-kafka wrapper would also work after my fix.

"timestamp":"2022-03-25T17:22:08.736Z"
Updating assignment with
Assigned partitions:                       [dw_notifications_aoa_v3-8,
dw_notifications_aoa_v3-7]
Current owned partitions:                  [dw_notifications_aoa_v3-9,
dw_notifications_aoa_v3-8, dw_notifications_aoa_v3-7]
Added partitions (assigned - owned):       []
Revoked partitions (owned - assigned):     [dw_notifications_aoa_v3-9]

"timestamp":"2022-03-25T17:25:33.721Z
Updating assignment with
    Assigned partitions:                       [dw_notifications_aoa_v3-4,
dw_notifications_aoa_v3-2, dw_notifications_aoa_v3-0,
dw_notifications_aoa_v3-8, dw_notifications_aoa_v3-7]
Current owned partitions:                  [dw_notifications_aoa_v3-8,
dw_notifications_aoa_v3-7]
Added partitions (assigned - owned):       [dw_notifications_aoa_v3-4,
dw_notifications_aoa_v3-2, dw_notifications_aoa_v3-0]
Revoked partitions (owned - assigned):     []

{"timestamp":"2022-03-25T17:25:33.725Z","message":"New partition assigned
to consumer for Topic: dw_notifications_aoa_v3:0",
{"timestamp":"2022-03-25T17:25:33.728Z","message":"New partition assigned
to consumer for Topic: dw_notifications_aoa_v3:2",
{"timestamp":"2022-03-25T17:25:33.729Z","message":"New partition assigned
to consumer for Topic: dw_notifications_aoa_v3:4",

On Sat, Mar 19, 2022 at 6:00 PM Liam Clarke-Hutchinson <lclar...@redhat.com>
wrote:

> Hi Richard,
>
> Yeah, the old 2.8.1 version of Kafka clients used by trunk fs2-kafka is
> what I think might be the issue, not the wrapper itself, sorry if I was
> unclear on that.
>
> Please let us know how your testing with the latest fs2-kafka that's using
> 3.1.0 goes. :)
>
> Kind regards,
>
> Liam Clarke-Hutchinson
>
> On Sun, 20 Mar 2022 at 07:19, Richard Ney <kamisama....@gmail.com> wrote:
>
> > I am using the kafka-clients through the fs2-kafka wrapper. Thou the log
> > message I posted and copied again here
> >
> > Notifying assignor about the new Assignment(partitions=[
> > platform-data.appquery-platform.aoav3.backfill-28,
> > platform-data.appquery-platform.aoav3.backfill-31,
> > platform-data.appquery-platform.aoav3.backfill-34,
> > platform-data.appquery-platform.aoav3.backfill-37,
> > platform-data.appquery-platform.aoav3.backfill-40,
> > platform-data.appquery-platform.aoav3.backfill-43,
> > platform-data.appquery-platform.aoav3.backfill-46,
> >
> >
> platform-data.appquery-platform.aoav3.backfill-49])","logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"fs2-kafka-consumer-95","level":"INFO"}
> >
> > is not generated by the fs2-kafka wrapper. Based on the kafka-client
> code;
> > these are the log messages generated during the initial partition
> > assignment when my application connects to the Kafka brokers. If this
> list
> > contained the full list of partitions listed in the kafka-consumer-groups
> > output but the lag was increasing on 2 partitions, I would immediately
> > suspect the fs2-kafka wrapper as the issue. The fact that the
> notification
> > messages from the kafka-clients library to the fs2-kafka library are
> > missing two partitions makes me suspect the issue is in the kafka-clients
> > library. In this occurrence this happened on 2 of the 5 consumer
> instances.
> > The version of the kafka-clients library used by the version of the
> > fs2-kafka library for this test is *2.8.1*. I'm currently running another
> > test with the latest fs2-kafka library which is consuming the *3.1.0*
> > version of the kafka-clients library. Initial partition assignment was
> > successful. On Monday I'll do a large number of scale-up/scale-down tests
> > to force rebalancing of partitions to see if I can replicate the issue
> > using the latest version.
> >
> > On Sat, Mar 19, 2022 at 2:06 AM Liam Clarke-Hutchinson <
> > lclar...@redhat.com>
> > wrote:
> >
> > > So to clarify, you're using kafka-clients directly? Or via fx2-kafka?
> If
> > > it's kafka-clients directly, what version please?
> > >
> > > On Sat, 19 Mar 2022 at 19:59, Richard Ney <kamisama....@gmail.com>
> > wrote:
> > >
> > > > Hi Liam,
> > > >
> > > > Sorry for the mis-identification in the last email. The fun of
> > answering
> > > an
> > > > email on a phone instead of a desktop. I confirmed the upper log
> > > messages I
> > > > included in the message come from this location in the
> `kafka-clients`
> > > > library
> > > >
> > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L422
> > > >
> > > > And it's only including 8 of the 10 partitions that were assigned to
> > that
> > > > consumer instance.
> > > >
> > > > -Richard
> > > >
> > > > On Fri, Mar 18, 2022 at 11:15 PM Richard Ney <kamisama....@gmail.com
> >
> > > > wrote:
> > > >
> > > > > Hi Ngā mihi,
> > > > >
> > > > > I believe the log entry I included was from the underlying
> > > kafka-clients
> > > > > library given that the logger identified is
> > > > > “org.apache.kafka.clients.consumer.internals.ConsumerCoordinator”.
> > I’ll
> > > > > admit at first I thought it also might be the fs2-kafka wrapper
> given
> > > > that
> > > > > the 2.4.0 version is the first version that has correct support for
> > the
> > > > > messaging from the ConsumerCoordinator. I am planning to do a test
> > with
> > > > the
> > > > > 3.0.0-M5 version which is built on the updated 3.1.0 kafka-clients
> > > > library
> > > > > and will let the list know.
> > > > >
> > > > > -Richard Ney
> > > > >
> > > > > Sent from my iPhone
> > > > >
> > > > > > On Mar 18, 2022, at 10:55 PM, Liam Clarke-Hutchinson <
> > > > > lclar...@redhat.com> wrote:
> > > > > >
> > > > > > Kia ora Richard,
> > > > > >
> > > > > > I see support for the Cooperative Sticky Assignor in fs2-kafka is
> > > quite
> > > > > > new. Have you discussed this issue with the community of that
> > client
> > > at
> > > > > > all? I ask because I see on GitHub that fs2-kafka is using
> > > > kafka-clients
> > > > > > 2.8.1 as the underlying client, and there's been a fair few
> > bugfixes
> > > > > around
> > > > > > the cooperative sticky assignor since that version.
> > > > > >
> > > > > > Could you perhaps try overriding the kafka-clients dependency of
> > > > > fs2-kafka
> > > > > > and try a higher version, perhaps 3.1.0, and see if the issue
> > > remains?
> > > > > I'm
> > > > > > not sure how well that'll work, but might help narrow down the
> > issue.
> > > > > >
> > > > > > Ngā mihi,
> > > > > >
> > > > > > Liam Clarke-Hutchinson
> > > > > >
> > > > > >> On Sat, 19 Mar 2022 at 14:24, Richard Ney <
> kamisama....@gmail.com
> > >
> > > > > wrote:
> > > > > >>
> > > > > >> Thanks for the additional information Bruno. Does this look
> like a
> > > > > possible
> > > > > >> bug in the CooperativeStickyAssignor? I have 5 consumers reading
> > > from
> > > > a
> > > > > 50
> > > > > >> partition topic. Based on the log messages this application
> > instance
> > > > is
> > > > > >> only getting assigned 8 partitions but when I ask the consumer
> > group
> > > > for
> > > > > >> LAG information the consumer group thinks the correct number of
> 10
> > > > > >> partitions were assigned but as should 2 partitions aren't
> getting
> > > > read
> > > > > due
> > > > > >> to the application not knowing it has them.
> > > > > >>
> > > > > >> {"timestamp":"2022-03-19T00:54:46.025Z","message":"[Consumer
> > > > > >> instanceId=i-0e89c9bee06f71f68,
> > > > > >>
> > > > >
> > >
> clientId=consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68,
> > > > > >> groupId=app-query-platform-aoa-backfill-v7] Updating assignment
> > > > with\n\t
> > > > > >> Assigned partitions: [
> > > > > >> platform-data.appquery-platform.aoav3.backfill-28,
> > > > > >> platform-data.appquery-platform.aoav3.backfill-43,
> > > > > >> platform-data.appquery-platform.aoav3.backfill-31,
> > > > > >> platform-data.appquery-platform.aoav3.backfill-46,
> > > > > >> platform-data.appquery-platform.aoav3.backfill-34,
> > > > > >> platform-data.appquery-platform.aoav3.backfill-49,
> > > > > >> platform-data.appquery-platform.aoav3.backfill-40,
> > > > > >> platform-data.appquery-platform.aoav3.backfill-37] \n\t
> > > > > >> Current owned partitions:                  []\n\t
> > > > > >>
> > > > > >> Added partitions (assigned - owned):       [
> > > > > >> platform-data.appquery-platform.aoav3.backfill-28,
> > > > > >> platform-data.appquery-platform.aoav3.backfill-43,
> > > > > >> platform-data.appquery-platform.aoav3.backfill-31,
> > > > > >> platform-data.appquery-platform.aoav3.backfill-46,
> > > > > >> platform-data.appquery-platform.aoav3.backfill-34,
> > > > > >> platform-data.appquery-platform.aoav3.backfill-49,
> > > > > >> platform-data.appquery-platform.aoav3.backfill-40,
> > > > > >> platform-data.appquery-platform.aoav3.backfill-37]\n\t
> > > > > >>
> > > > > >> Revoked partitions (owned - assigned):
> > > > > >>
> > > > > >>
> > > > >
> > > >
> > >
> >
> []\n","logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"fs2-kafka-consumer-95","level":"INFO"}
> > > > > >>
> > > > > >>
> > > > > >> {"timestamp":"2022-03-19T00:54:46.026Z","message":"[Consumer
> > > > > >> instanceId=i-0e89c9bee06f71f68,
> > > > > >>
> > > > >
> > >
> clientId=consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68,
> > > > > >> groupId=app-query-platform-aoa-backfill-v7]
> > > > > >>
> > > > > >> Notifying assignor about the new Assignment(partitions=[
> > > > > >> platform-data.appquery-platform.aoav3.backfill-28,
> > > > > >> platform-data.appquery-platform.aoav3.backfill-31,
> > > > > >> platform-data.appquery-platform.aoav3.backfill-34,
> > > > > >> platform-data.appquery-platform.aoav3.backfill-37,
> > > > > >> platform-data.appquery-platform.aoav3.backfill-40,
> > > > > >> platform-data.appquery-platform.aoav3.backfill-43,
> > > > > >> platform-data.appquery-platform.aoav3.backfill-46,
> > > > > >>
> > > > > >>
> > > > >
> > > >
> > >
> >
> platform-data.appquery-platform.aoav3.backfill-49])","logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"fs2-kafka-consumer-95","level":"INFO"}
> > > > > >>
> > > > > >> {"timestamp":"2022-03-19T00:54:46.028Z","message":"[Consumer
> > > > > >> instanceId=i-0e89c9bee06f71f68,
> > > > > >>
> > > > >
> > >
> clientId=consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68,
> > > > > >> groupId=app-query-platform-aoa-backfill-v7]
> > > > > >>
> > > > > >> Adding newly assigned partitions:
> > > > > >> platform-data.appquery-platform.aoav3.backfill-28,
> > > > > >> platform-data.appquery-platform.aoav3.backfill-43,
> > > > > >> platform-data.appquery-platform.aoav3.backfill-31,
> > > > > >> platform-data.appquery-platform.aoav3.backfill-46,
> > > > > >> platform-data.appquery-platform.aoav3.backfill-34,
> > > > > >> platform-data.appquery-platform.aoav3.backfill-49,
> > > > > >> platform-data.appquery-platform.aoav3.backfill-40,
> > > > > >>
> > > > > >>
> > > > >
> > > >
> > >
> >
> platform-data.appquery-platform.aoav3.backfill-37","logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"fs2-kafka-consumer-95","level":"INFO"}
> > > > > >>
> > > > > >> *OUTPUT FROM `/usr/local/opt/kafka/bin/kafka-consumer-groups`*
> > > > > >>
> > > > > >> GROUP                              TOPIC
> > > > > >>       PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
> > > > > >> CONSUMER-ID                                              HOST
> > > > > >> CLIENT-ID
> > > > > >> app-query-platform-aoa-backfill-v7
> > > > > >> platform-data.appquery-platform.aoav3.backfill 40
>  8369679
> > > > > >> 8369696         17
> > > > > >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /
> > > > 10.123.16.69
> > > > > >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> > > > > >> app-query-platform-aoa-backfill-v7
> > > > > >> platform-data.appquery-platform.aoav3.backfill 37
>  8369643
> > > > > >> 8369658         15
> > > > > >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /
> > > > 10.123.16.69
> > > > > >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> > > > > >> app-query-platform-aoa-backfill-v7
> > > > > >> platform-data.appquery-platform.aoav3.backfill 46
>  8368044
> > > > > >> 8368055         11
> > > > > >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /
> > > > 10.123.16.69
> > > > > >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> > > > > >> app-query-platform-aoa-backfill-v7
> > > > > >> platform-data.appquery-platform.aoav3.backfill 34
>  8379346
> > > > > >> 8379358         12
> > > > > >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /
> > > > 10.123.16.69
> > > > > >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> > > > > >> app-query-platform-aoa-backfill-v7
> > > > > >> platform-data.appquery-platform.aoav3.backfill 28
>  8374244
> > > > > >> 8374247         3
> > > > > >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /
> > > > 10.123.16.69
> > > > > >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> > > > > >> app-query-platform-aoa-backfill-v7
> > > > > >> platform-data.appquery-platform.aoav3.backfill 49
>  8364656
> > > > > >> 8364665         9
> > > > > >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /
> > > > 10.123.16.69
> > > > > >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> > > > > >> app-query-platform-aoa-backfill-v7
> > > > > >> platform-data.appquery-platform.aoav3.backfill 43
>  8369980
> > > > > >> 8369988         8
> > > > > >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /
> > > > 10.123.16.69
> > > > > >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> > > > > >> app-query-platform-aoa-backfill-v7
> > > > > >> platform-data.appquery-platform.aoav3.backfill 25
>  8369261
> > > > > >> 8370063         802
> > > > > >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /
> > > > 10.123.16.69
> > > > > >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> > > > > >> app-query-platform-aoa-backfill-v7
> > > > > >> platform-data.appquery-platform.aoav3.backfill 31
>  8368087
> > > > > >> 8368097         10
> > > > > >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /
> > > > 10.123.16.69
> > > > > >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> > > > > >> app-query-platform-aoa-backfill-v7
> > > > > >> platform-data.appquery-platform.aoav3.backfill 22
>  8370475
> > > > > >> 8371319         844
> > > > > >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /
> > > > 10.123.16.69
> > > > > >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> > > > > >>
> > > > > >>> On Thu, Mar 17, 2022 at 2:29 AM Bruno Cadonna <
> > cado...@apache.org>
> > > > > wrote:
> > > > > >>>
> > > > > >>> Hi Richard,
> > > > > >>>
> > > > > >>> The group.instance.id config is orthogonal to the partition
> > > > assignment
> > > > > >>> strategy. The group.instance.id is used if you want to have
> > static
> > > > > >>> membership which is not related to the partition assignment
> > > strategy.
> > > > > >>>
> > > > > >>> If you think you found a bug, could you please open a JIRA
> ticket
> > > > with
> > > > > >>> steps to reproduce the bug.
> > > > > >>>
> > > > > >>> Best,
> > > > > >>> Bruno
> > > > > >>>
> > > > > >>> On 16.03.22 10:01, Luke Chen wrote:
> > > > > >>>> Hi Richard,
> > > > > >>>>
> > > > > >>>> Right, you are not missing any settings beyond the partition
> > > > > assignment
> > > > > >>>> strategy and the group instance id.
> > > > > >>>> You might need to know from the log that why the rebalance
> > > triggered
> > > > > to
> > > > > >>> do
> > > > > >>>> troubleshooting.
> > > > > >>>>
> > > > > >>>> Thank you.
> > > > > >>>> Luke
> > > > > >>>>
> > > > > >>>> On Wed, Mar 16, 2022 at 3:02 PM Richard Ney <
> > > kamisama....@gmail.com
> > > > >
> > > > > >>> wrote:
> > > > > >>>>
> > > > > >>>>> Hi Luke,
> > > > > >>>>>
> > > > > >>>>> I did end up with a situation where I had two instances
> > > connecting
> > > > to
> > > > > >>> the
> > > > > >>>>> same consumer group and they ended up in a rebalance
> trade-off.
> > > All
> > > > > >>>>> partitions kept going back and forth between the two
> > microservice
> > > > > >>>>> instances. That was a test case where I'd removed the Group
> > > > Instance
> > > > > >> Id
> > > > > >>>>> setting to see what would happen. I stabilized that one by
> > > reducing
> > > > > it
> > > > > >>> to a
> > > > > >>>>> single consumer after 20+ rebalances.
> > > > > >>>>>
> > > > > >>>>> The other issue I'm seeing may be a bug in the Functional
> Scala
> > > > > >>> `fs2-kafka`
> > > > > >>>>> wrapper where I see the partitions cleanly assigned but one
> or
> > > more
> > > > > >>>>> instances isn't ingesting. I found out that they recently
> added
> > > > > >> support
> > > > > >>> for
> > > > > >>>>> the cooperative sticky assignor for the stream recreation
> since
> > > > they
> > > > > >>> were
> > > > > >>>>> assuming a full revocation of the partitions.
> > > > > >>>>>
> > > > > >>>>> So I basically wanted to make sure I wasn't missing any
> > settings
> > > > > >> beyond
> > > > > >>> the
> > > > > >>>>> partition assignment strategy and the group instance id.
> > > > > >>>>>
> > > > > >>>>> -Richard
> > > > > >>>>>
> > > > > >>>>> -Richard
> > > > > >>>>>
> > > > > >>>>> On Tue, Mar 15, 2022 at 11:27 PM Luke Chen <
> show...@gmail.com>
> > > > > wrote:
> > > > > >>>>>
> > > > > >>>>>> Hi Richard,
> > > > > >>>>>>
> > > > > >>>>>> To use `CooperativeStickyAssignor`, no other special
> > > configuration
> > > > > is
> > > > > >>>>>> required.
> > > > > >>>>>>
> > > > > >>>>>> I'm not sure what does `make the rebalance happen cleanly`
> > mean.
> > > > > >>>>>> Did you find any problem during group rebalance?
> > > > > >>>>>>
> > > > > >>>>>> Thank you.
> > > > > >>>>>> Luke
> > > > > >>>>>>
> > > > > >>>>>> On Wed, Mar 16, 2022 at 1:00 PM Richard Ney <
> > > > > richard....@lookout.com
> > > > > >>>>>> .invalid>
> > > > > >>>>>> wrote:
> > > > > >>>>>>
> > > > > >>>>>>> Trying to find a good sample of what consumer settings
> > besides
> > > > > >> setting
> > > > > >>>>>>>
> > > > > >>>>>>> ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG to
> > > > > >>>>>>> org.apache.kafka.clients.consumer.CooperativeStickyAssignor
> > > > > >>>>>>>
> > > > > >>>>>>> is needed to make the rebalance happen cleanly. Unable to
> > find
> > > > and
> > > > > >>>>> decent
> > > > > >>>>>>> documentation or code samples. I have set the Group
> Instance
> > Id
> > > > to
> > > > > >> the
> > > > > >>>>>> EC2
> > > > > >>>>>>> instance id based on one blog write up I found.
> > > > > >>>>>>>
> > > > > >>>>>>> Any help would be appreciated
> > > > > >>>>>>>
> > > > > >>>>>>> -Richard
> > > > > >>>>>>>
> > > > > >>>>>>
> > > > > >>>>>
> > > > > >>>>
> > > > > >>>
> > > > > >>
> > > > >
> > > >
> > >
> >
>

Reply via email to