Yes, version=0 should work. You might take a look at
o.a.k.clients.consumer.internals.ConsumerProtocol if there are any other
details that aren't clear from the protocol wiki page.

-Ewen

On Thu, Dec 24, 2015 at 3:32 AM, Oleksiy Krivoshey <oleks...@gmail.com>
wrote:

> Hi Ewen,
>
> Thanks for detailed explanation. So its basically the version of
> MemberAssignment structure, not the version of the assignment strategy as I
> thought. Should I use version=0 in protocol exchanges for now? (I'm
> building a client in Node.js for 0.9:
> https://github.com/oleksiyk/kafka/blob/master/lib/group_consumer.js )
>
> On Thu, 24 Dec 2015 at 10:15 Ewen Cheslack-Postava <e...@confluent.io>
> wrote:
>
> > Oleksiy,
> >
> > The join group protocol is general enough to handle multiple types of
> group
> > membership, not just consumers. This is used in Kafka Connect to form a
> > group of workers (which, instead of splitting topic partitions between
> > members splits connector tasks).
> >
> > In order to make this work and allow flexibility in how assignment is
> > handled, the protocol is divided into two layers. The primary join group
> > protocol only a) keeps track of group membership and b) selects a group
> > protocol that all members agree they can work with. At this level,
> there's
> > no version information, no info about consumer subscriptions, and no
> > knowledge of partition assignment strategies other than the names and
> > opaque metadata submitted by clients.
> >
> > The "embedded" layer is where the version info you're setting is
> specified.
> > This is never even parsed by the brokers -- the information is collected
> > and sent to one of the group members which then decodes it and determines
> > the assignment info. That result is then returned to the broker which
> > disseminates the information (and again, the broker never decodes this,
> it
> > just forwards the appropriate info to each member).
> >
> > The version is included specifically in the consumer protocol to allow us
> > to extend the format in the future. For example, if we needed to add or
> > change the way subscriptions are expressed, we could increase that
> version
> > number and update the message format. In other words, it is the mechanism
> > we have chosen *only for the consumer embedded protocol* to allow
> metadata
> > format changes. (Note that for the consumer embedded protocol there is
> also
> > *yet another* layer of data, called "UserData" in that protocol
> > documentation; this is custom data the partition assignment strategy in
> the
> > consumer, which is pluggable, might want include, e.g. if you were doing
> > resource-based assignment you might need to include info like # of cpus,
> > which is specific to that assignment strategy).
> >
> > The broker only looks at the ProtocolName (which is equivalent to
> > AssignmentStrategy for consumers) when choosing which protocol to use for
> > consumers. If you want to version those in an incompatible way (i.e. you
> > can't handle the change just by updating the format of your metadata),
> you
> > should include version info in the ProtocolName itself to ensure the
> group
> > coordinator broker can differentiate them, e.g. round-robin vs
> > round-robin-2. But you should also think carefully about whether that
> > change is necessary -- in many cases if you're not adding any metadata
> > you'll be fine just keeping the same name since one member is selected to
> > perform the assignment and every other member just needs to respect
> > whatever assignment it makes. And of course if you're just trying to
> switch
> > to a completely different assignment strategy (e.g. from range ->
> > round-robin), then the name itself is enough. Just bounce all consumers
> > adding round-robin as an option, then bounce them all removing range.
> >
> > We considered other options when designing this protocol, but decided
> this
> > was the best tradeoff. The current protocol is already pretty complex and
> > multi-layered and the alternatives that tried to build in versioning at
> > this level too were even more complex and confusing.
> >
> > -Ewen
> >
> >
> >
> > On Wed, Dec 23, 2015 at 10:45 PM, Oleksiy Krivoshey <oleks...@gmail.com>
> > wrote:
> >
> > > Hi Ewen,
> > >
> > > I specify version in ProtocolMetadata structure, as per this document:
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-JoinGroupResponse
> > >
> > > ---------------
> > > ProtocolType => "consumer"
> > >
> > > ProtocolName => AssignmentStrategy
> > >   AssignmentStrategy => string
> > >
> > > ProtocolMetadata => Version Subscription UserData
> > >   Version => int16
> > >   Subscription => [Topic]
> > >     Topic => string
> > >   UserData => bytes
> > > -----------------
> > >
> > > Maybe I misunderstood the purpose of this version field?
> > >
> > > On Thu, 24 Dec 2015 at 00:27 Ewen Cheslack-Postava <e...@confluent.io>
> > > wrote:
> > >
> > > > Oleksiy,
> > > >
> > > > Where are you specifying the version? Unless I'm missing something,
> the
> > > > JoinGroup protocol doesn't include versions so I'm not sure I
> > understand
> > > > the examples you are giving. Are the version numbers included in the
> > > > per-protocol metadata?
> > > >
> > > > You can see exactly how the consumer coordinator on the broker
> selects
> > > the
> > > > protocol here:
> > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/GroupMetadata.scala#L179
> > > > It is just taking the candidate protocols (ones that are available
> for
> > > all
> > > > consumers), then has each consumer "vote" by selecting whichever
> > > candidate
> > > > appears in its list of strategies first, then uses the one with the
> > most
> > > > votes.
> > > >
> > > > Is it possible your example is behaving the way it is because it
> > actually
> > > > has duplicates for "strategyX", and in the last case it chooses the
> > first
> > > > strategyX despite the conflicting versions?
> > > >
> > > > -Ewen
> > > >
> > > > On Wed, Dec 23, 2015 at 9:44 AM, Oleksiy Krivoshey <
> oleks...@gmail.com
> > >
> > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > I can't understand how the protocol upgrades (to newer version)
> > should
> > > > > work. When I send GroupJoinRequest with a list of assignment
> > protocols
> > > > > (same protocol name, different versions) always the first
> > > > protocol/version
> > > > > gets picked up as a member version. Even if all consumers in the
> > group
> > > > are
> > > > > configured with two versions still always the first specified
> version
> > > > will
> > > > > be selected by coordinator and not the one with highest version
> > number.
> > > > >
> > > > > So for example:
> > > > > consumer1: [ {name:strategyX, version: 0}, {name: strategyX,
> version:
> > > 1}
> > > > ]
> > > > > consumer2: [ {name:strategyX, version: 0}, {name: strategyX,
> version:
> > > 1}
> > > > ]
> > > > >
> > > > > Both will be assigned a version 0 in a response to leader. If I
> make
> > it
> > > > > this way:
> > > > >
> > > > > consumer1: [ {name:strategyX, version: 1}, {name: strategyX,
> version:
> > > 0}
> > > > ]
> > > > > consumer2: [ {name:strategyX, version: 1}, {name: strategyX,
> version:
> > > 0}
> > > > ]
> > > > >
> > > > > Both will be assigned version 1.
> > > > >
> > > > > In this case:
> > > > >
> > > > > consumer1: [ {name:strategyX, version: 10}, {name: strategyX,
> > version:
> > > > 1} ]
> > > > > consumer2: [ {name:strategyX, version: 20}, {name: strategyX,
> > version:
> > > > 1} ]
> > > > >
> > > > > Kafka will endlessly try to rebalance the group without success
> > because
> > > > > consumer1 will have version:10 and consumer2 - version:20 in a
> > > > > GroupJoinResponse.
> > > > >
> > > > > Can anyone please explain the process of the protocol version
> > upgrade?
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > Thanks,
> > > > Ewen
> > > >
> > >
> >
> >
> >
> > --
> > Thanks,
> > Ewen
> >
>



-- 
Thanks,
Ewen

Reply via email to