Thank you for the detailed explanation. Is it essential that offsets be stored in Kafka, or could they be stored outside the kafka/zookeeper system? Will it affect how logs are managed, and when “older” messages are purged? Or are they two independent systems?
On 11/2/15, 03:51, "Stevo Slavić" <ssla...@gmail.com> wrote: >Here is a bit longer and more detailed, not necessarily better >understandable explanation. > >When using Kafka for offsets storage, consumer offsets get stored in >(compacted) consumer offsets Kafka topic ("__consumer_offsets"). Every >partition of consumer offsets topic could store offsets from multiple >consumer groups, but offsets of a single consumer group will always be >stored in (get mapped to) same partition of consumer offsets topic (e.g. >consumer group x and y offsets could be both stored in partition 0, while >consumer group z offsets could be stored in partition 49 of consumer >offsets topic; even if consumer group x is consuming two different topics, >offsets would be stored in same partition of consumer offsets topic). Like >any other Kafka topic partition, one can read/write (consumer offsets) >topic partitions only from their lead brokers, and every partition has >only >one lead broker. Lead broker of a partition where offsets are stored for >given consumer group is called consumer coordinator broker for that >consumer group. To fetch/commit offsets for consumer group, you first need >to resolve consumer coordinator broker for that consumer group, and then >send fetch/commit offsets to that broker (btw, it's dynamic, can change >even after just being resolved which broker is coordinator broker for >given >consumer group, e.g. when initial lead broker is lost and a replica >becomes >a new lead). Until there is a leader assigned for a partition and broker >who is leader is not yet aware of that assignment, topic partition cannot >be read/written, it's offline - read/write requests made to that broker >will return error code in response. Consumer offsets topic is not created >automatically on broker/cluster startup - instead it gets created on first >(direct or indirect) request for consumer offsets topic metadata... That >lookup triggers creation of consumer offsets topic, parts of the topic >creation process happen async to request for topic creation, and it can >take time for topic creation to actually fully finish. Especially if you >leave consumer offsets topic default settings (replication factor of 3, 50 >partitions) consumer offsets topic creation can take time - this is nice >default for production cluster, but not for one used in integration tests >with single broker. > >When you try to fetch (or commit) offset for the first time, broker >internally retrieves metadata about consumer offsets topic, which >initiates >consumer offsets topic creation when it doesn't exist, but that first >request to fetch offset (usually) fails, since chosen consumer coordinator >broker is not yet aware that it is coordinator for that group, and sends >back NotCoordinatorForConsumerCode/NotCoordinatorForConsumerException. >Docs >(see >https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Pro >tocol >) state that "The broker returns this error code if it receives an offset >fetch or commit request for a consumer group that it is not a coordinator >for." > >You could modify your code to retry fetching offsets, but not infinitely, >or you could trigger consumer offsets topic init before fetching offsets. >For the init option you have (at least) two alternatives. > >Modify your test, before reading/committing offsets but not before >creating >your topic to init (trigger creation of) consumer offsets topic by looking >up consumer offsets topic metadata, and add some delay to let consumer >offsets topic be fully created (all of its default 50 partitions get >leader >assigned, broker aware, etc.). > >You could do this consumer offsets topic initialization before creating >your topic, but then make sure that in broker configuration replication >factor for consumer offsets topic is not higher than number of brokers (1 >in scenario your described) - otherwise consumer offsets topic creation >will fail. It would fail since fresh broker in single broker cluster, if >it >receives a request for consumer offsets topic metadata, will fail to >create >consumer offsets topic, no matter how long delay after metadata lookup; it >is not aware of how many live brokers there are in cluster and will just >try to create consumer offsets topic with default replication factor of 3 >and that will fail, initial topic partitions replica assignment happens >during topic creation and replication factor has to be <= than number of >live brokers when topic is created. Consumer offsets topic creation does >not fail (although it takes time to finish), if it is done after some >other >topic creation has been requested, because that other topic creation >request makes broker aware (updates its cache) that it is a sole live >broker in the cluster, and then consumer offsets topic creation will >ignore >requested/configured replication factor of 3 and will (silently) fallback >and use replication factor of 1 (= number of live brokers in cluster). > >Maybe things would be cleaner if topic creation allowed non-live brokers >to >be used in replica assignment. Then not only would (consumer offsets) >topic >creation not fail if there are not enough of live brokers to meet >configured replication factor, but even better Kafka cluster controller >broker could do this initialization of consumer offsets topic if it >doesn't >already exist. > >Btw, looking up topic metadata in zookeeper and checking that all of its >partitions have a leader assigned is not enough for topic partitions to be >writable/readable/online - if broker is still not aware that it is the >leader for a given partition (e.g. broker metadata cache is outdated), >that >partition would still not be writable/readable. One could lookup topic >metadata from Kafka brokers, and more specifically from lead brokers of >partitions to check if they are ware that they are lead brokers for that >partition. Not even that is guarantee that publishing will be successful, >since leadership can change at any moment, and other things can fail (e.g. >unreliable network), so one has to handle errors. But, as you see problem >is not with your topic, but with internal consumer offsets topic. > > >On Mon, Nov 2, 2015 at 1:56 AM, Stevo Slavić <ssla...@gmail.com> wrote: > >> Hello David, >> >> In short, problem is not with your topic, it is with consumer offsets >> topic initialization. >> >> You could modify your code to just retry fetching offsets (until >> successful where successful is also return of -1 offset, or timeout), or >> alternatively you could trigger consumer offsets topic init (by fetching >> consumer offsets topic metadata from broker) before issuing offset fetch >> request. >> >> For the init option you have (at least) two alternatives: do it after or >> before request to create your topic. >> >> If you chose to do consumer offsets topic init before request to create >> your (first) topic, make sure to configure broker with replication >>factor >> of 1 for consumer offsets topic (offsets.topic.replication.factor config >> property) otherwise consumer offsets topic init will fail. Do this >>config >> change only for integration test with single broker, but not for >>production >> cluster which should have 3 or more brokers anyway. >> >> Kind regards, >> Stevo Slavic. >> >> >> >> On Sun, Nov 1, 2015 at 9:38 PM, David Corbin <dcor...@lancope.com> >>wrote: >> >>> Yes. I know all of that. I guess my original message was not clear. >>> >>> The topic metadata indicates that there is a leader. >>> >>> Please see my comments interspersed below. >>> Thanks >>> David Corbin >>> >>> On 10/29/15, 17:53, "Mayuresh Gharat" <gharatmayures...@gmail.com> >>>wrote: >>> >>> > NotCoordinatorForConsumerException is for consumer side. >>> >>> I am attempting to consume messages, so this means I¹m getting an >>> exception that might be reasonable for what I¹m doing. >>> >>> >I think if you >>> >are using simpleConsumer, then you have to do your own offset >>>management. >>> >>> I am attempting to manage my offsets myself; that¹s why I called >>> SimpleConsumer#fetchOffsets() which is throwing the exception. I >>>assume >>> you are not suggesting that my offset management should be done >>>entirely >>> ³outside the scope² of Kafka. If that is true, why do 4 of 3 methods >>>on >>> SimpleConsumer refer to offsets? >>> >>> >>> > >>> >TopicMetadata tells you whether there is a leader for the topic >>> >partitions, >>> >replicas, ISR. >>> >>> >>> Yes. After I create the topic, I ³poll² the metadata until the one >>> topic/partition has a leader, before moving on. If the metadata says >>> there is a leader, I don¹t understand whyI would get >>> NotCoordinatorForConsumerException >>> >>> > >>> >Thanks, >>> > >>> >Mayuresh >>> > >>> >On Wed, Oct 28, 2015 at 8:23 AM, David Corbin <dcor...@lancope.com> >>> wrote: >>> > >>> >> I'm working on a project that I hope to use the SimpleConsumer on. >>>I'm >>> >> trying to write some test code around our code that is wrapping the >>> >> SimpleConsumer. >>> >> >>> >> The environment is 1 kafka broker, and 1 zookeeper >>> >> The test goes like this: >>> >> >>> >> Create a topic ("foo", 1 partition, 1 replica) >>> >> Create a Producer, and send a message >>> >> Create a SimpleConsumer, and try to read the offset >>> >> >>> >> Failure with: NotCoordinatorForConsumerException >>> >> >>> >> If I continue to require for an extended period, I continue to get >>>that >>> >> exception. >>> >> >>> >> As near as I can tell, the topic metadata says there is a leader, >>>but >>> >>the >>> >> the broker thinks it's being rebalanced. >>> >> I've done the above test immediately after stop, clean out old data, >>> and >>> >> restart both zookeeper and kafka. >>> >> >>> >> Suggestions welcome. >>> >> >>> >> >>> > >>> > >>> >-- >>> >-Regards, >>> >Mayuresh R. Gharat >>> >(862) 250-7125 >>> >>> >>