Thank you for the detailed explanation.

Is it essential that offsets be stored in Kafka, or could they be stored
outside the kafka/zookeeper system?  Will it affect how logs are managed,
and when “older” messages are purged? Or are they two independent systems?

On 11/2/15, 03:51, "Stevo Slavić" <ssla...@gmail.com> wrote:

>Here is a bit longer and more detailed, not necessarily better
>understandable explanation.
>
>When using Kafka for offsets storage, consumer offsets get stored in
>(compacted) consumer offsets Kafka topic ("__consumer_offsets"). Every
>partition of consumer offsets topic could store offsets from multiple
>consumer groups, but offsets of a single consumer group will always be
>stored in (get mapped to) same partition of consumer offsets topic (e.g.
>consumer group x and y offsets could be both stored in partition 0, while
>consumer group z offsets could be stored in partition 49 of consumer
>offsets topic; even if consumer group x is consuming two different topics,
>offsets would be stored in same partition of consumer offsets topic). Like
>any other Kafka topic partition, one can read/write (consumer offsets)
>topic partitions only from their lead brokers, and every partition has
>only
>one lead broker. Lead broker of a partition where offsets are stored for
>given consumer group is called consumer coordinator broker for that
>consumer group. To fetch/commit offsets for consumer group, you first need
>to resolve consumer coordinator broker for that consumer group, and then
>send fetch/commit offsets to that broker (btw, it's dynamic, can change
>even after just being resolved which broker is coordinator broker for
>given
>consumer group, e.g. when initial lead broker is lost and a replica
>becomes
>a new lead). Until there is a leader assigned for a partition and broker
>who is leader is not yet aware of that assignment, topic partition cannot
>be read/written, it's offline - read/write requests made to that broker
>will return error code in response. Consumer offsets topic is not created
>automatically on broker/cluster startup - instead it gets created on first
>(direct or indirect) request for consumer offsets topic metadata... That
>lookup triggers creation of consumer offsets topic, parts of the topic
>creation process happen async to request for topic creation, and it can
>take time for topic creation to actually fully finish. Especially if you
>leave consumer offsets topic default settings (replication factor of 3, 50
>partitions) consumer offsets topic creation can take time - this is nice
>default for production cluster, but not for one used in integration tests
>with single broker.
>
>When you try to fetch (or commit) offset for the first time, broker
>internally retrieves metadata about consumer offsets topic, which
>initiates
>consumer offsets topic creation when it doesn't exist, but that first
>request to fetch offset (usually) fails, since chosen consumer coordinator
>broker is not yet aware that it is coordinator for that group, and sends
>back NotCoordinatorForConsumerCode/NotCoordinatorForConsumerException.
>Docs
>(see
>https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Pro
>tocol
>) state that "The broker returns this error code if it receives an offset
>fetch or commit request for a consumer group that it is not a coordinator
>for."
>
>You could modify your code to retry fetching offsets, but not infinitely,
>or you could trigger consumer offsets topic init before fetching offsets.
>For the init option you have (at least) two alternatives.
>
>Modify your test, before reading/committing offsets but not before
>creating
>your topic to init (trigger creation of) consumer offsets topic by looking
>up consumer offsets topic metadata, and add some delay to let consumer
>offsets topic be fully created (all of its default 50 partitions get
>leader
>assigned, broker aware, etc.).
>
>You could do this consumer offsets topic initialization before creating
>your topic, but then make sure that in broker configuration replication
>factor for consumer offsets topic is not higher than number of brokers (1
>in scenario your described) - otherwise consumer offsets topic creation
>will fail. It would fail since fresh broker in single broker cluster, if
>it
>receives a request for consumer offsets topic metadata, will fail to
>create
>consumer offsets topic, no matter how long delay after metadata lookup; it
>is not aware of how many live brokers there are in cluster and will just
>try to create consumer offsets topic with default replication factor of 3
>and that will fail, initial topic partitions replica assignment happens
>during topic creation and replication factor has to be <= than number of
>live brokers when topic is created. Consumer offsets topic creation does
>not fail (although it takes time to finish), if it is done after some
>other
>topic creation has been requested, because that other topic creation
>request makes broker aware (updates its cache) that it is a sole live
>broker in the cluster, and then consumer offsets topic creation will
>ignore
>requested/configured replication factor of 3 and will (silently) fallback
>and use replication factor of 1 (= number of live brokers in cluster).
>
>Maybe things would be cleaner if topic creation allowed non-live brokers
>to
>be used in replica assignment. Then not only would (consumer offsets)
>topic
>creation not fail if there are not enough of live brokers to meet
>configured replication factor, but even better Kafka cluster controller
>broker could do this initialization of consumer offsets topic if it
>doesn't
>already exist.
>
>Btw, looking up topic metadata in zookeeper and checking that all of its
>partitions have a leader assigned is not enough for topic partitions to be
>writable/readable/online - if broker is still not aware that it is the
>leader for a given partition (e.g. broker metadata cache is outdated),
>that
>partition would still not be writable/readable. One could lookup topic
>metadata from Kafka brokers, and more specifically from lead brokers of
>partitions to check if they are ware that they are lead brokers for that
>partition. Not even that is guarantee that publishing will be successful,
>since leadership can change at any moment, and other things can fail (e.g.
>unreliable network), so one has to handle errors. But, as you see problem
>is not with your topic, but with internal consumer offsets topic.
>
>
>On Mon, Nov 2, 2015 at 1:56 AM, Stevo Slavić <ssla...@gmail.com> wrote:
>
>> Hello David,
>>
>> In short, problem is not with your topic, it is with consumer offsets
>> topic initialization.
>>
>> You could modify your code to just retry fetching offsets (until
>> successful where successful is also return of -1 offset, or timeout), or
>> alternatively you could trigger consumer offsets topic init (by fetching
>> consumer offsets topic metadata from broker) before issuing offset fetch
>> request.
>>
>> For the init option you have (at least) two alternatives: do it after or
>> before request to create your topic.
>>
>> If you chose to do consumer offsets topic init before request to create
>> your (first) topic, make sure to configure broker with replication
>>factor
>> of 1 for consumer offsets topic (offsets.topic.replication.factor config
>> property) otherwise consumer offsets topic init will fail. Do this
>>config
>> change only for integration test with single broker, but not for
>>production
>> cluster which should have 3 or more brokers anyway.
>>
>> Kind regards,
>> Stevo Slavic.
>>
>>
>>
>> On Sun, Nov 1, 2015 at 9:38 PM, David Corbin <dcor...@lancope.com>
>>wrote:
>>
>>> Yes.  I know all of that.  I guess my original message was not clear.
>>>
>>> The topic metadata indicates that there is a leader.
>>>
>>> Please see my comments interspersed below.
>>> Thanks
>>> David Corbin
>>>
>>> On 10/29/15, 17:53, "Mayuresh Gharat" <gharatmayures...@gmail.com>
>>>wrote:
>>>
>>> > NotCoordinatorForConsumerException is for consumer side.
>>>
>>> I am attempting to consume messages, so this means I¹m getting an
>>> exception that might be reasonable for what I¹m doing.
>>>
>>> >I think if you
>>> >are using simpleConsumer, then you have to do your own offset
>>>management.
>>>
>>> I am attempting to manage my offsets myself; that¹s why I called
>>> SimpleConsumer#fetchOffsets() which is throwing the exception.  I
>>>assume
>>> you are not suggesting that my offset management should be done
>>>entirely
>>> ³outside the scope² of Kafka.  If that is true, why do 4 of 3 methods
>>>on
>>> SimpleConsumer refer to offsets?
>>>
>>>
>>> >
>>> >TopicMetadata tells you whether there is a leader for the topic
>>> >partitions,
>>> >replicas, ISR.
>>>
>>>
>>> Yes.  After I create the topic, I ³poll² the metadata until the one
>>> topic/partition has a leader, before moving on.  If the metadata says
>>> there is a leader, I don¹t understand whyI would get
>>> NotCoordinatorForConsumerException
>>>
>>> >
>>> >Thanks,
>>> >
>>> >Mayuresh
>>> >
>>> >On Wed, Oct 28, 2015 at 8:23 AM, David Corbin <dcor...@lancope.com>
>>> wrote:
>>> >
>>> >> I'm working on a project that I hope to use the SimpleConsumer on.
>>>I'm
>>> >> trying to write some test code around our code that is wrapping the
>>> >> SimpleConsumer.
>>> >>
>>> >> The environment is 1 kafka broker, and 1 zookeeper
>>> >> The test goes like this:
>>> >>
>>> >> Create a topic ("foo", 1 partition, 1 replica)
>>> >> Create a Producer, and send a message
>>> >> Create a SimpleConsumer, and try to read the offset
>>> >>
>>> >> Failure with: NotCoordinatorForConsumerException
>>> >>
>>> >> If I continue to require for an extended period, I continue to get
>>>that
>>> >> exception.
>>> >>
>>> >> As near as I can tell, the topic metadata says there is a leader,
>>>but
>>> >>the
>>> >> the broker thinks it's being rebalanced.
>>> >> I've done the above test immediately after stop, clean out old data,
>>> and
>>> >> restart both zookeeper and kafka.
>>> >>
>>> >> Suggestions welcome.
>>> >>
>>> >>
>>> >
>>> >
>>> >--
>>> >-Regards,
>>> >Mayuresh R. Gharat
>>> >(862) 250-7125
>>>
>>>
>>

Reply via email to