Hi David, My Bad. I did not understand your question correctly. Thanks Stevo for detailed explanation.
Just incase, if you want checkout kafka-based offset management, this is a very good presentation : http://www.slideshare.net/jjkoshy/offset-management-in-kafka Thanks, Mayuresh On Mon, Nov 2, 2015 at 12:51 AM, Stevo Slavić <ssla...@gmail.com> wrote: > Here is a bit longer and more detailed, not necessarily better > understandable explanation. > > When using Kafka for offsets storage, consumer offsets get stored in > (compacted) consumer offsets Kafka topic ("__consumer_offsets"). Every > partition of consumer offsets topic could store offsets from multiple > consumer groups, but offsets of a single consumer group will always be > stored in (get mapped to) same partition of consumer offsets topic (e.g. > consumer group x and y offsets could be both stored in partition 0, while > consumer group z offsets could be stored in partition 49 of consumer > offsets topic; even if consumer group x is consuming two different topics, > offsets would be stored in same partition of consumer offsets topic). Like > any other Kafka topic partition, one can read/write (consumer offsets) > topic partitions only from their lead brokers, and every partition has only > one lead broker. Lead broker of a partition where offsets are stored for > given consumer group is called consumer coordinator broker for that > consumer group. To fetch/commit offsets for consumer group, you first need > to resolve consumer coordinator broker for that consumer group, and then > send fetch/commit offsets to that broker (btw, it's dynamic, can change > even after just being resolved which broker is coordinator broker for given > consumer group, e.g. when initial lead broker is lost and a replica becomes > a new lead). Until there is a leader assigned for a partition and broker > who is leader is not yet aware of that assignment, topic partition cannot > be read/written, it's offline - read/write requests made to that broker > will return error code in response. Consumer offsets topic is not created > automatically on broker/cluster startup - instead it gets created on first > (direct or indirect) request for consumer offsets topic metadata... That > lookup triggers creation of consumer offsets topic, parts of the topic > creation process happen async to request for topic creation, and it can > take time for topic creation to actually fully finish. Especially if you > leave consumer offsets topic default settings (replication factor of 3, 50 > partitions) consumer offsets topic creation can take time - this is nice > default for production cluster, but not for one used in integration tests > with single broker. > > When you try to fetch (or commit) offset for the first time, broker > internally retrieves metadata about consumer offsets topic, which initiates > consumer offsets topic creation when it doesn't exist, but that first > request to fetch offset (usually) fails, since chosen consumer coordinator > broker is not yet aware that it is coordinator for that group, and sends > back NotCoordinatorForConsumerCode/NotCoordinatorForConsumerException. Docs > (see > > https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol > ) state that "The broker returns this error code if it receives an offset > fetch or commit request for a consumer group that it is not a coordinator > for." > > You could modify your code to retry fetching offsets, but not infinitely, > or you could trigger consumer offsets topic init before fetching offsets. > For the init option you have (at least) two alternatives. > > Modify your test, before reading/committing offsets but not before creating > your topic to init (trigger creation of) consumer offsets topic by looking > up consumer offsets topic metadata, and add some delay to let consumer > offsets topic be fully created (all of its default 50 partitions get leader > assigned, broker aware, etc.). > > You could do this consumer offsets topic initialization before creating > your topic, but then make sure that in broker configuration replication > factor for consumer offsets topic is not higher than number of brokers (1 > in scenario your described) - otherwise consumer offsets topic creation > will fail. It would fail since fresh broker in single broker cluster, if it > receives a request for consumer offsets topic metadata, will fail to create > consumer offsets topic, no matter how long delay after metadata lookup; it > is not aware of how many live brokers there are in cluster and will just > try to create consumer offsets topic with default replication factor of 3 > and that will fail, initial topic partitions replica assignment happens > during topic creation and replication factor has to be <= than number of > live brokers when topic is created. Consumer offsets topic creation does > not fail (although it takes time to finish), if it is done after some other > topic creation has been requested, because that other topic creation > request makes broker aware (updates its cache) that it is a sole live > broker in the cluster, and then consumer offsets topic creation will ignore > requested/configured replication factor of 3 and will (silently) fallback > and use replication factor of 1 (= number of live brokers in cluster). > > Maybe things would be cleaner if topic creation allowed non-live brokers to > be used in replica assignment. Then not only would (consumer offsets) topic > creation not fail if there are not enough of live brokers to meet > configured replication factor, but even better Kafka cluster controller > broker could do this initialization of consumer offsets topic if it doesn't > already exist. > > Btw, looking up topic metadata in zookeeper and checking that all of its > partitions have a leader assigned is not enough for topic partitions to be > writable/readable/online - if broker is still not aware that it is the > leader for a given partition (e.g. broker metadata cache is outdated), that > partition would still not be writable/readable. One could lookup topic > metadata from Kafka brokers, and more specifically from lead brokers of > partitions to check if they are ware that they are lead brokers for that > partition. Not even that is guarantee that publishing will be successful, > since leadership can change at any moment, and other things can fail (e.g. > unreliable network), so one has to handle errors. But, as you see problem > is not with your topic, but with internal consumer offsets topic. > > > On Mon, Nov 2, 2015 at 1:56 AM, Stevo Slavić <ssla...@gmail.com> wrote: > > > Hello David, > > > > In short, problem is not with your topic, it is with consumer offsets > > topic initialization. > > > > You could modify your code to just retry fetching offsets (until > > successful where successful is also return of -1 offset, or timeout), or > > alternatively you could trigger consumer offsets topic init (by fetching > > consumer offsets topic metadata from broker) before issuing offset fetch > > request. > > > > For the init option you have (at least) two alternatives: do it after or > > before request to create your topic. > > > > If you chose to do consumer offsets topic init before request to create > > your (first) topic, make sure to configure broker with replication factor > > of 1 for consumer offsets topic (offsets.topic.replication.factor config > > property) otherwise consumer offsets topic init will fail. Do this config > > change only for integration test with single broker, but not for > production > > cluster which should have 3 or more brokers anyway. > > > > Kind regards, > > Stevo Slavic. > > > > > > > > On Sun, Nov 1, 2015 at 9:38 PM, David Corbin <dcor...@lancope.com> > wrote: > > > >> Yes. I know all of that. I guess my original message was not clear. > >> > >> The topic metadata indicates that there is a leader. > >> > >> Please see my comments interspersed below. > >> Thanks > >> David Corbin > >> > >> On 10/29/15, 17:53, "Mayuresh Gharat" <gharatmayures...@gmail.com> > wrote: > >> > >> > NotCoordinatorForConsumerException is for consumer side. > >> > >> I am attempting to consume messages, so this means I¹m getting an > >> exception that might be reasonable for what I¹m doing. > >> > >> >I think if you > >> >are using simpleConsumer, then you have to do your own offset > management. > >> > >> I am attempting to manage my offsets myself; that¹s why I called > >> SimpleConsumer#fetchOffsets() which is throwing the exception. I assume > >> you are not suggesting that my offset management should be done entirely > >> ³outside the scope² of Kafka. If that is true, why do 4 of 3 methods on > >> SimpleConsumer refer to offsets? > >> > >> > >> > > >> >TopicMetadata tells you whether there is a leader for the topic > >> >partitions, > >> >replicas, ISR. > >> > >> > >> Yes. After I create the topic, I ³poll² the metadata until the one > >> topic/partition has a leader, before moving on. If the metadata says > >> there is a leader, I don¹t understand whyI would get > >> NotCoordinatorForConsumerException > >> > >> > > >> >Thanks, > >> > > >> >Mayuresh > >> > > >> >On Wed, Oct 28, 2015 at 8:23 AM, David Corbin <dcor...@lancope.com> > >> wrote: > >> > > >> >> I'm working on a project that I hope to use the SimpleConsumer on. > I'm > >> >> trying to write some test code around our code that is wrapping the > >> >> SimpleConsumer. > >> >> > >> >> The environment is 1 kafka broker, and 1 zookeeper > >> >> The test goes like this: > >> >> > >> >> Create a topic ("foo", 1 partition, 1 replica) > >> >> Create a Producer, and send a message > >> >> Create a SimpleConsumer, and try to read the offset > >> >> > >> >> Failure with: NotCoordinatorForConsumerException > >> >> > >> >> If I continue to require for an extended period, I continue to get > that > >> >> exception. > >> >> > >> >> As near as I can tell, the topic metadata says there is a leader, but > >> >>the > >> >> the broker thinks it's being rebalanced. > >> >> I've done the above test immediately after stop, clean out old data, > >> and > >> >> restart both zookeeper and kafka. > >> >> > >> >> Suggestions welcome. > >> >> > >> >> > >> > > >> > > >> >-- > >> >-Regards, > >> >Mayuresh R. Gharat > >> >(862) 250-7125 > >> > >> > > > -- -Regards, Mayuresh R. Gharat (862) 250-7125