Here is a bit longer and more detailed, not necessarily better understandable explanation.
When using Kafka for offsets storage, consumer offsets get stored in (compacted) consumer offsets Kafka topic ("__consumer_offsets"). Every partition of consumer offsets topic could store offsets from multiple consumer groups, but offsets of a single consumer group will always be stored in (get mapped to) same partition of consumer offsets topic (e.g. consumer group x and y offsets could be both stored in partition 0, while consumer group z offsets could be stored in partition 49 of consumer offsets topic; even if consumer group x is consuming two different topics, offsets would be stored in same partition of consumer offsets topic). Like any other Kafka topic partition, one can read/write (consumer offsets) topic partitions only from their lead brokers, and every partition has only one lead broker. Lead broker of a partition where offsets are stored for given consumer group is called consumer coordinator broker for that consumer group. To fetch/commit offsets for consumer group, you first need to resolve consumer coordinator broker for that consumer group, and then send fetch/commit offsets to that broker (btw, it's dynamic, can change even after just being resolved which broker is coordinator broker for given consumer group, e.g. when initial lead broker is lost and a replica becomes a new lead). Until there is a leader assigned for a partition and broker who is leader is not yet aware of that assignment, topic partition cannot be read/written, it's offline - read/write requests made to that broker will return error code in response. Consumer offsets topic is not created automatically on broker/cluster startup - instead it gets created on first (direct or indirect) request for consumer offsets topic metadata... That lookup triggers creation of consumer offsets topic, parts of the topic creation process happen async to request for topic creation, and it can take time for topic creation to actually fully finish. Especially if you leave consumer offsets topic default settings (replication factor of 3, 50 partitions) consumer offsets topic creation can take time - this is nice default for production cluster, but not for one used in integration tests with single broker. When you try to fetch (or commit) offset for the first time, broker internally retrieves metadata about consumer offsets topic, which initiates consumer offsets topic creation when it doesn't exist, but that first request to fetch offset (usually) fails, since chosen consumer coordinator broker is not yet aware that it is coordinator for that group, and sends back NotCoordinatorForConsumerCode/NotCoordinatorForConsumerException. Docs (see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol ) state that "The broker returns this error code if it receives an offset fetch or commit request for a consumer group that it is not a coordinator for." You could modify your code to retry fetching offsets, but not infinitely, or you could trigger consumer offsets topic init before fetching offsets. For the init option you have (at least) two alternatives. Modify your test, before reading/committing offsets but not before creating your topic to init (trigger creation of) consumer offsets topic by looking up consumer offsets topic metadata, and add some delay to let consumer offsets topic be fully created (all of its default 50 partitions get leader assigned, broker aware, etc.). You could do this consumer offsets topic initialization before creating your topic, but then make sure that in broker configuration replication factor for consumer offsets topic is not higher than number of brokers (1 in scenario your described) - otherwise consumer offsets topic creation will fail. It would fail since fresh broker in single broker cluster, if it receives a request for consumer offsets topic metadata, will fail to create consumer offsets topic, no matter how long delay after metadata lookup; it is not aware of how many live brokers there are in cluster and will just try to create consumer offsets topic with default replication factor of 3 and that will fail, initial topic partitions replica assignment happens during topic creation and replication factor has to be <= than number of live brokers when topic is created. Consumer offsets topic creation does not fail (although it takes time to finish), if it is done after some other topic creation has been requested, because that other topic creation request makes broker aware (updates its cache) that it is a sole live broker in the cluster, and then consumer offsets topic creation will ignore requested/configured replication factor of 3 and will (silently) fallback and use replication factor of 1 (= number of live brokers in cluster). Maybe things would be cleaner if topic creation allowed non-live brokers to be used in replica assignment. Then not only would (consumer offsets) topic creation not fail if there are not enough of live brokers to meet configured replication factor, but even better Kafka cluster controller broker could do this initialization of consumer offsets topic if it doesn't already exist. Btw, looking up topic metadata in zookeeper and checking that all of its partitions have a leader assigned is not enough for topic partitions to be writable/readable/online - if broker is still not aware that it is the leader for a given partition (e.g. broker metadata cache is outdated), that partition would still not be writable/readable. One could lookup topic metadata from Kafka brokers, and more specifically from lead brokers of partitions to check if they are ware that they are lead brokers for that partition. Not even that is guarantee that publishing will be successful, since leadership can change at any moment, and other things can fail (e.g. unreliable network), so one has to handle errors. But, as you see problem is not with your topic, but with internal consumer offsets topic. On Mon, Nov 2, 2015 at 1:56 AM, Stevo Slavić <ssla...@gmail.com> wrote: > Hello David, > > In short, problem is not with your topic, it is with consumer offsets > topic initialization. > > You could modify your code to just retry fetching offsets (until > successful where successful is also return of -1 offset, or timeout), or > alternatively you could trigger consumer offsets topic init (by fetching > consumer offsets topic metadata from broker) before issuing offset fetch > request. > > For the init option you have (at least) two alternatives: do it after or > before request to create your topic. > > If you chose to do consumer offsets topic init before request to create > your (first) topic, make sure to configure broker with replication factor > of 1 for consumer offsets topic (offsets.topic.replication.factor config > property) otherwise consumer offsets topic init will fail. Do this config > change only for integration test with single broker, but not for production > cluster which should have 3 or more brokers anyway. > > Kind regards, > Stevo Slavic. > > > > On Sun, Nov 1, 2015 at 9:38 PM, David Corbin <dcor...@lancope.com> wrote: > >> Yes. I know all of that. I guess my original message was not clear. >> >> The topic metadata indicates that there is a leader. >> >> Please see my comments interspersed below. >> Thanks >> David Corbin >> >> On 10/29/15, 17:53, "Mayuresh Gharat" <gharatmayures...@gmail.com> wrote: >> >> > NotCoordinatorForConsumerException is for consumer side. >> >> I am attempting to consume messages, so this means I¹m getting an >> exception that might be reasonable for what I¹m doing. >> >> >I think if you >> >are using simpleConsumer, then you have to do your own offset management. >> >> I am attempting to manage my offsets myself; that¹s why I called >> SimpleConsumer#fetchOffsets() which is throwing the exception. I assume >> you are not suggesting that my offset management should be done entirely >> ³outside the scope² of Kafka. If that is true, why do 4 of 3 methods on >> SimpleConsumer refer to offsets? >> >> >> > >> >TopicMetadata tells you whether there is a leader for the topic >> >partitions, >> >replicas, ISR. >> >> >> Yes. After I create the topic, I ³poll² the metadata until the one >> topic/partition has a leader, before moving on. If the metadata says >> there is a leader, I don¹t understand whyI would get >> NotCoordinatorForConsumerException >> >> > >> >Thanks, >> > >> >Mayuresh >> > >> >On Wed, Oct 28, 2015 at 8:23 AM, David Corbin <dcor...@lancope.com> >> wrote: >> > >> >> I'm working on a project that I hope to use the SimpleConsumer on. I'm >> >> trying to write some test code around our code that is wrapping the >> >> SimpleConsumer. >> >> >> >> The environment is 1 kafka broker, and 1 zookeeper >> >> The test goes like this: >> >> >> >> Create a topic ("foo", 1 partition, 1 replica) >> >> Create a Producer, and send a message >> >> Create a SimpleConsumer, and try to read the offset >> >> >> >> Failure with: NotCoordinatorForConsumerException >> >> >> >> If I continue to require for an extended period, I continue to get that >> >> exception. >> >> >> >> As near as I can tell, the topic metadata says there is a leader, but >> >>the >> >> the broker thinks it's being rebalanced. >> >> I've done the above test immediately after stop, clean out old data, >> and >> >> restart both zookeeper and kafka. >> >> >> >> Suggestions welcome. >> >> >> >> >> > >> > >> >-- >> >-Regards, >> >Mayuresh R. Gharat >> >(862) 250-7125 >> >> >