There's another related bug - triggering offsets topic creation through
requesting metadata about that topic does not work in case of single broker
clean (no topics created yet) Kafka cluster running. In that case sequence
returned by KafkaApis.getAliveBrokers is empty, and
KafkaApis.getTopicMetadata will issue a AdminUtils.createTopic request with
offsetsTopicReplicationFactor of 3, which will fail with

ERROR [KafkaApi-0] error when handling request Name: TopicMetadataRequest;
Version: 0; CorrelationId: 0; ClientId: ; Topics: __consumer_offsets
(kafka.server.KafkaApis)
kafka.admin.AdminOperationException: replication factor: 3 larger than
available brokers: 1
    at kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:70)
    at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:171)
    at kafka.server.KafkaApis$$anonfun$19.apply(KafkaApis.scala:513)
    at kafka.server.KafkaApis$$anonfun$19.apply(KafkaApis.scala:503)
    at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
    at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
    at scala.collection.immutable.Set$Set1.foreach(Set.scala:79)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
    at
scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47)
    at scala.collection.SetLike$class.map(SetLike.scala:92)
    at scala.collection.AbstractSet.map(Set.scala:47)
    at kafka.server.KafkaApis.getTopicMetadata(KafkaApis.scala:503)
    at
kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:542)
    at kafka.server.KafkaApis.handle(KafkaApis.scala:62)
    at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
    at java.lang.Thread.run(Thread.java:745)

So this code in KafkaApis.getTopicMetadata

val offsetsTopicReplicationFactor =
  if (aliveBrokers.length > 0)
    Math.min(config.offsetsTopicReplicationFactor, aliveBrokers.length)
  else
    config.offsetsTopicReplicationFactor

should probably look like

val offsetsTopicReplicationFactor =
  if (aliveBrokers.length > 0)
    Math.min(config.offsetsTopicReplicationFactor, aliveBrokers.length)
  else
    Math.min(config.offsetsTopicReplicationFactor,
ZkUtils.getChildrenParentMayNotExist(zkClient,
ZkUtils.BrokerIdsPath).length)

Rationale: since aliveBrokers is actually information from (metadata) cache
- if broker has in cache info that there are 0 brokers alive, that surely
means that cache hasn't been set/updated ever, and in that case do not
assume that there are enough (3) brokers up, but instead lookup in ZK how
many brokers are alive, and based on that determine replication factor for
offsets topic.

Does this make sense?

As workaround, I guess I will have to resort to explicitly creating offsets
topic if it doesn't exist already.

Kind regards,
Stevo Slavic.

On Tue, Oct 6, 2015 at 11:34 AM, Stevo Slavić <ssla...@gmail.com> wrote:

> Debugged, and found in KafkaApis.handleConsumerMetadataRequest that
> consumer offsets topic gets created on first lookup of offsets topic
> metadata, even when auto topic creation is disabled.
>
> In that method there is following call:
>
> // get metadata (and create the topic if necessary)
> val offsetsTopicMetadata =
> getTopicMetadata(Set(OffsetManager.OffsetsTopicName)).head
>
> and it returns on first call offsetsTopicMetadata with empty
> partitionsMetadata sequence and errorCode 5.
>
> By the docs this means:
> LeaderNotAvailable => This error is thrown if we are in the middle of a
> leadership election and there is currently no leader for this partition and
> hence it is unavailable for writes.
>
> Since leadership election takes some time, especially for default 50
> partitions of consumer offsets topic, first call to lookup consumer
> metadata request will always return an error. This sounds like a bug,
> especially if it's not documented as to be expected behavior. I'd prefer if
> Kafka broker/controller did this init of consumer offsets topic on startup.
>
> As workaround, as init step of my integration test, I will lookup offsets
> topic metadata, until successful or timeout.
>
> Kind regards,
> Stevo Slavic.
>
> On Tue, Oct 6, 2015 at 10:02 AM, Stevo Slavić <ssla...@gmail.com> wrote:
>
>> Thanks Grant for quick reply!
>>
>> I've used AdminUtils.topicExists("__consumer_offsets") check and even
>> 10sec after Kafka broker startup, the check fails.
>>
>> When, on which event, does this internal topic get created? Is there some
>> broker config property preventing it from being created? Does one have to
>> use high level consumer or make some special request (JoingGroup?) using
>> simple consumer API to trigger consumer offsets topic init on broker?
>>
>> I'm using simple consumer API - I assume exclude.internal.topics,
>> offsets.storage or dual.commit.enabled however configured shouldn't affect
>> me, since I'm passing OffsetCommitRequest with version id 1, and even more
>> I do not even reach point where commit is done, since lookup of consumer
>> coordinator is throwing ConsumerCoordinatorNotAvailableException.
>>
>> Kind regards,
>> Stevo Slavic.
>>
>> On Mon, Oct 5, 2015 at 5:59 PM, Grant Henke <ghe...@cloudera.com> wrote:
>>
>>> Hi Stevo,
>>>
>>> There are a couple of options to verify the topic exists:
>>>
>>>    1. Consume from a topic with "offsets.storage=kafka". If its not
>>> created
>>>    already, this should create it.
>>>    2. List and describe the topic using the Kafka topics script. Ex:
>>>
>>> bin/kafka-topics.sh --zookeeper localhost:2181 --list
>>>
>>> bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic
>>> __consumer_offsets
>>>
>>>
>>>    1. Check the ZNode exists in Zookeeper. Ex:
>>>
>>> bin/zookeeper-shell.sh localhost:2181
>>> ls /brokers/topics/__consumer_offsets
>>>
>>> get /brokers/topics/__consumer_offsets
>>>
>>>
>>> Thanks,
>>> Grant
>>>
>>> On Mon, Oct 5, 2015 at 10:44 AM, Stevo Slavić <ssla...@gmail.com> wrote:
>>>
>>> > Hello Apache Kafka community,
>>> >
>>> > In my integration tests, with single 0.8.2.2 broker, for newly created
>>> > topic with single partition, after determining through topic metadata
>>> > request that partition has lead broker assigned, when I try to reset
>>> offset
>>> > for given consumer group, I first try to discover offset coordinator
>>> and
>>> > that lookup is throwing ConsumerCoordinatorNotAvailableException
>>> >
>>> > On
>>> >
>>> >
>>> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
>>> > it is documented that broker returns
>>> ConsumerCoordinatorNotAvailableCode
>>> > for consumer metadata requests or offset commit requests if the offsets
>>> > topic has not yet been created.
>>> >
>>> > I wonder if this is really the case, that the offsets topic has not
>>> been
>>> > created. Any tips how to ensure/verify that offsets topic exists?
>>> >
>>> > Kind regards,
>>> >
>>> > Stevo Slavic.
>>> >
>>>
>>>
>>>
>>> --
>>> Grant Henke
>>> Software Engineer | Cloudera
>>> gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke
>>>
>>
>>
>

Reply via email to