Here is a bit longer and more detailed, not necessarily better
understandable explanation.

When using Kafka for offsets storage, consumer offsets get stored in
(compacted) consumer offsets Kafka topic ("__consumer_offsets"). Every
partition of consumer offsets topic could store offsets from multiple
consumer groups, but offsets of a single consumer group will always be
stored in (get mapped to) same partition of consumer offsets topic (e.g.
consumer group x and y offsets could be both stored in partition 0, while
consumer group z offsets could be stored in partition 49 of consumer
offsets topic; even if consumer group x is consuming two different topics,
offsets would be stored in same partition of consumer offsets topic). Like
any other Kafka topic partition, one can read/write (consumer offsets)
topic partitions only from their lead brokers, and every partition has only
one lead broker. Lead broker of a partition where offsets are stored for
given consumer group is called consumer coordinator broker for that
consumer group. To fetch/commit offsets for consumer group, you first need
to resolve consumer coordinator broker for that consumer group, and then
send fetch/commit offsets to that broker (btw, it's dynamic, can change
even after just being resolved which broker is coordinator broker for given
consumer group, e.g. when initial lead broker is lost and a replica becomes
a new lead). Until there is a leader assigned for a partition and broker
who is leader is not yet aware of that assignment, topic partition cannot
be read/written, it's offline - read/write requests made to that broker
will return error code in response. Consumer offsets topic is not created
automatically on broker/cluster startup - instead it gets created on first
(direct or indirect) request for consumer offsets topic metadata... That
lookup triggers creation of consumer offsets topic, parts of the topic
creation process happen async to request for topic creation, and it can
take time for topic creation to actually fully finish. Especially if you
leave consumer offsets topic default settings (replication factor of 3, 50
partitions) consumer offsets topic creation can take time - this is nice
default for production cluster, but not for one used in integration tests
with single broker.

When you try to fetch (or commit) offset for the first time, broker
internally retrieves metadata about consumer offsets topic, which initiates
consumer offsets topic creation when it doesn't exist, but that first
request to fetch offset (usually) fails, since chosen consumer coordinator
broker is not yet aware that it is coordinator for that group, and sends
back NotCoordinatorForConsumerCode/NotCoordinatorForConsumerException. Docs
(see
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
) state that "The broker returns this error code if it receives an offset
fetch or commit request for a consumer group that it is not a coordinator
for."

You could modify your code to retry fetching offsets, but not infinitely,
or you could trigger consumer offsets topic init before fetching offsets.
For the init option you have (at least) two alternatives.

Modify your test, before reading/committing offsets but not before creating
your topic to init (trigger creation of) consumer offsets topic by looking
up consumer offsets topic metadata, and add some delay to let consumer
offsets topic be fully created (all of its default 50 partitions get leader
assigned, broker aware, etc.).

You could do this consumer offsets topic initialization before creating
your topic, but then make sure that in broker configuration replication
factor for consumer offsets topic is not higher than number of brokers (1
in scenario your described) - otherwise consumer offsets topic creation
will fail. It would fail since fresh broker in single broker cluster, if it
receives a request for consumer offsets topic metadata, will fail to create
consumer offsets topic, no matter how long delay after metadata lookup; it
is not aware of how many live brokers there are in cluster and will just
try to create consumer offsets topic with default replication factor of 3
and that will fail, initial topic partitions replica assignment happens
during topic creation and replication factor has to be <= than number of
live brokers when topic is created. Consumer offsets topic creation does
not fail (although it takes time to finish), if it is done after some other
topic creation has been requested, because that other topic creation
request makes broker aware (updates its cache) that it is a sole live
broker in the cluster, and then consumer offsets topic creation will ignore
requested/configured replication factor of 3 and will (silently) fallback
and use replication factor of 1 (= number of live brokers in cluster).

Maybe things would be cleaner if topic creation allowed non-live brokers to
be used in replica assignment. Then not only would (consumer offsets) topic
creation not fail if there are not enough of live brokers to meet
configured replication factor, but even better Kafka cluster controller
broker could do this initialization of consumer offsets topic if it doesn't
already exist.

Btw, looking up topic metadata in zookeeper and checking that all of its
partitions have a leader assigned is not enough for topic partitions to be
writable/readable/online - if broker is still not aware that it is the
leader for a given partition (e.g. broker metadata cache is outdated), that
partition would still not be writable/readable. One could lookup topic
metadata from Kafka brokers, and more specifically from lead brokers of
partitions to check if they are ware that they are lead brokers for that
partition. Not even that is guarantee that publishing will be successful,
since leadership can change at any moment, and other things can fail (e.g.
unreliable network), so one has to handle errors. But, as you see problem
is not with your topic, but with internal consumer offsets topic.


On Mon, Nov 2, 2015 at 1:56 AM, Stevo Slavić <ssla...@gmail.com> wrote:

> Hello David,
>
> In short, problem is not with your topic, it is with consumer offsets
> topic initialization.
>
> You could modify your code to just retry fetching offsets (until
> successful where successful is also return of -1 offset, or timeout), or
> alternatively you could trigger consumer offsets topic init (by fetching
> consumer offsets topic metadata from broker) before issuing offset fetch
> request.
>
> For the init option you have (at least) two alternatives: do it after or
> before request to create your topic.
>
> If you chose to do consumer offsets topic init before request to create
> your (first) topic, make sure to configure broker with replication factor
> of 1 for consumer offsets topic (offsets.topic.replication.factor config
> property) otherwise consumer offsets topic init will fail. Do this config
> change only for integration test with single broker, but not for production
> cluster which should have 3 or more brokers anyway.
>
> Kind regards,
> Stevo Slavic.
>
>
>
> On Sun, Nov 1, 2015 at 9:38 PM, David Corbin <dcor...@lancope.com> wrote:
>
>> Yes.  I know all of that.  I guess my original message was not clear.
>>
>> The topic metadata indicates that there is a leader.
>>
>> Please see my comments interspersed below.
>> Thanks
>> David Corbin
>>
>> On 10/29/15, 17:53, "Mayuresh Gharat" <gharatmayures...@gmail.com> wrote:
>>
>> > NotCoordinatorForConsumerException is for consumer side.
>>
>> I am attempting to consume messages, so this means I¹m getting an
>> exception that might be reasonable for what I¹m doing.
>>
>> >I think if you
>> >are using simpleConsumer, then you have to do your own offset management.
>>
>> I am attempting to manage my offsets myself; that¹s why I called
>> SimpleConsumer#fetchOffsets() which is throwing the exception.  I assume
>> you are not suggesting that my offset management should be done entirely
>> ³outside the scope² of Kafka.  If that is true, why do 4 of 3 methods on
>> SimpleConsumer refer to offsets?
>>
>>
>> >
>> >TopicMetadata tells you whether there is a leader for the topic
>> >partitions,
>> >replicas, ISR.
>>
>>
>> Yes.  After I create the topic, I ³poll² the metadata until the one
>> topic/partition has a leader, before moving on.  If the metadata says
>> there is a leader, I don¹t understand whyI would get
>> NotCoordinatorForConsumerException
>>
>> >
>> >Thanks,
>> >
>> >Mayuresh
>> >
>> >On Wed, Oct 28, 2015 at 8:23 AM, David Corbin <dcor...@lancope.com>
>> wrote:
>> >
>> >> I'm working on a project that I hope to use the SimpleConsumer on.  I'm
>> >> trying to write some test code around our code that is wrapping the
>> >> SimpleConsumer.
>> >>
>> >> The environment is 1 kafka broker, and 1 zookeeper
>> >> The test goes like this:
>> >>
>> >> Create a topic ("foo", 1 partition, 1 replica)
>> >> Create a Producer, and send a message
>> >> Create a SimpleConsumer, and try to read the offset
>> >>
>> >> Failure with: NotCoordinatorForConsumerException
>> >>
>> >> If I continue to require for an extended period, I continue to get that
>> >> exception.
>> >>
>> >> As near as I can tell, the topic metadata says there is a leader, but
>> >>the
>> >> the broker thinks it's being rebalanced.
>> >> I've done the above test immediately after stop, clean out old data,
>> and
>> >> restart both zookeeper and kafka.
>> >>
>> >> Suggestions welcome.
>> >>
>> >>
>> >
>> >
>> >--
>> >-Regards,
>> >Mayuresh R. Gharat
>> >(862) 250-7125
>>
>>
>

Reply via email to