> Here is a bit longer and more detailed, not necessarily better
> understandable explanation.
> When using Kafka for offsets storage, consumer offsets get stored in
> (compacted) consumer offsets Kafka topic ("__consumer_offsets"). Every
> partition of consumer offsets topic could store offsets from multiple
> consumer groups, but offsets of a single consumer group will always be
> stored in (get mapped to) same partition of consumer offsets topic (e.g.
> consumer group x and y offsets could be both stored in partition 0, while
> consumer group z offsets could be stored in partition 49 of consumer
> offsets topic; even if consumer group x is consuming two different topics,
> offsets would be stored in same partition of consumer offsets topic). Like
> any other Kafka topic partition, one can read/write (consumer offsets)
> topic partitions only from their lead brokers, and every partition has only
> one lead broker. Lead broker of a partition where offsets are stored for
> given consumer group is called consumer coordinator broker for that
> consumer group. To fetch/commit offsets for consumer group, you first need
> to resolve consumer coordinator broker for that consumer group, and then
> send fetch/commit offsets to that broker (btw, it's dynamic, can change
> even after just being resolved which broker is coordinator broker for given
> consumer group, e.g. when initial lead broker is lost and a replica becomes
> a new lead). Until there is a leader assigned for a partition and broker
> who is leader is not yet aware of that assignment, topic partition cannot
> be read/written, it's offline - read/write requests made to that broker
> will return error code in response. Consumer offsets topic is not created
> automatically on broker/cluster startup - instead it gets created on first
> (direct or indirect) request for consumer offsets topic metadata... That
> lookup triggers creation of consumer offsets topic, parts of the topic
> creation process happen async to request for topic creation, and it can
> take time for topic creation to actually fully finish. Especially if you
> leave consumer offsets topic default settings (replication factor of 3, 50
> partitions) consumer offsets topic creation can take time - this is nice
> default for production cluster, but not for one used in integration tests
> with single broker.
> When you try to fetch (or commit) offset for the first time, broker
> internally retrieves metadata about consumer offsets topic, which initiates
> consumer offsets topic creation when it doesn't exist, but that first
> request to fetch offset (usually) fails, since chosen consumer coordinator
> broker is not yet aware that it is coordinator for that group, and sends
> back NotCoordinatorForConsumerCode/NotCoordinatorForConsumerException. Docs
> (see
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
> ) state that "The broker returns this error code if it receives an offset
> fetch or commit request for a consumer group that it is not a coordinator
> for."
> You could modify your code to retry fetching offsets, but not infinitely,
> or you could trigger consumer offsets topic init before fetching offsets.
> For the init option you have (at least) two alternatives.
> Modify your test, before reading/committing offsets but not before creating
> your topic to init (trigger creation of) consumer offsets topic by looking
> up consumer offsets topic metadata, and add some delay to let consumer
> offsets topic be fully created (all of its default 50 partitions get leader
> assigned, broker aware, etc.).
> You could do this consumer offsets topic initialization before creating
> your topic, but then make sure that in broker configuration replication
> factor for consumer offsets topic is not higher than number of brokers (1
> in scenario your described) - otherwise consumer offsets topic creation
> will fail. It would fail since fresh broker in single broker cluster, if it
> receives a request for consumer offsets topic metadata, will fail to create
> consumer offsets topic, no matter how long delay after metadata lookup; it
> is not aware of how many live brokers there are in cluster and will just
> try to create consumer offsets topic with default replication factor of 3
> and that will fail, initial topic partitions replica assignment happens
> during topic creation and replication factor has to be <= than number of
> live brokers when topic is created. Consumer offsets topic creation does
> not fail (although it takes time to finish), if it is done after some other
> topic creation has been requested, because that other topic creation
> request makes broker aware (updates its cache) that it is a sole live
> broker in the cluster, and then consumer offsets topic creation will ignore
> requested/configured replication factor of 3 and will (silently) fallback
> and use replication factor of 1 (= number of live brokers in cluster).
> Maybe things would be cleaner if topic creation allowed non-live brokers to
> be used in replica assignment. Then not only would (consumer offsets) topic
> creation not fail if there are not enough of live brokers to meet
> configured replication factor, but even better Kafka cluster controller
> broker could do this initialization of consumer offsets topic if it doesn't
> already exist.
> Btw, looking up topic metadata in zookeeper and checking that all of its
> partitions have a leader assigned is not enough for topic partitions to be
> writable/readable/online - if broker is still not aware that it is the
> leader for a given partition (e.g. broker metadata cache is outdated), that
> partition would still not be writable/readable. One could lookup topic
> metadata from Kafka brokers, and more specifically from lead brokers of
> partitions to check if they are ware that they are lead brokers for that
> partition. Not even that is guarantee that publishing will be successful,
> since leadership can change at any moment, and other things can fail (e.g.
> unreliable network), so one has to handle errors. But, as you see problem
> is not with your topic, but with internal consumer offsets topic.
