If I'm not mistaken, replication factor of a topic does not get stored in ZK - on creation replication factor gets translated to topic replica assignment which gets stored in ZK.
Again, please correct me if wrong - it seems that only during topic creation it is verified that all replicas in replica assignment are actually live brokers. Once topic is created, topic can become under replicated - brokers from topic replica assignment can go down, and even in that case topic may still be usable (online) depending on other settings. Would it make sense, as exception for consumer offsets topic only, or as rule for all topics - to allow topic to be created with replica assignment which is referencing brokers which do not exist (just rely on monotonically increasing broker ids convention) or are just not alive? None of the brokers referenced in replica assignment would have to be alive or maybe require just one referenced broker to be alive, or maybe make it configuration option - switch whether to enforce that brokers referenced in replica assignment are alive, or min alive brokers in replica assignment setting, or both. Kind regards, Stevo Slavic. On Thu, Oct 8, 2015 at 3:32 PM, Stevo Slavić <ssla...@gmail.com> wrote: > There's another related bug - triggering offsets topic creation through > requesting metadata about that topic does not work in case of single broker > clean (no topics created yet) Kafka cluster running. In that case sequence > returned by KafkaApis.getAliveBrokers is empty, and > KafkaApis.getTopicMetadata will issue a AdminUtils.createTopic request > with offsetsTopicReplicationFactor of 3, which will fail with > > ERROR [KafkaApi-0] error when handling request Name: TopicMetadataRequest; > Version: 0; CorrelationId: 0; ClientId: ; Topics: __consumer_offsets > (kafka.server.KafkaApis) > kafka.admin.AdminOperationException: replication factor: 3 larger than > available brokers: 1 > at kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:70) > at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:171) > at kafka.server.KafkaApis$$anonfun$19.apply(KafkaApis.scala:513) > at kafka.server.KafkaApis$$anonfun$19.apply(KafkaApis.scala:503) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) > at scala.collection.immutable.Set$Set1.foreach(Set.scala:79) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:245) > at > scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47) > at scala.collection.SetLike$class.map(SetLike.scala:92) > at scala.collection.AbstractSet.map(Set.scala:47) > at kafka.server.KafkaApis.getTopicMetadata(KafkaApis.scala:503) > at > kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:542) > at kafka.server.KafkaApis.handle(KafkaApis.scala:62) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59) > at java.lang.Thread.run(Thread.java:745) > > So this code in KafkaApis.getTopicMetadata > > val offsetsTopicReplicationFactor = > if (aliveBrokers.length > 0) > Math.min(config.offsetsTopicReplicationFactor, aliveBrokers.length) > else > config.offsetsTopicReplicationFactor > > should probably look like > > val offsetsTopicReplicationFactor = > if (aliveBrokers.length > 0) > Math.min(config.offsetsTopicReplicationFactor, aliveBrokers.length) > else > Math.min(config.offsetsTopicReplicationFactor, > ZkUtils.getChildrenParentMayNotExist(zkClient, > ZkUtils.BrokerIdsPath).length) > > Rationale: since aliveBrokers is actually information from (metadata) > cache - if broker has in cache info that there are 0 brokers alive, that > surely means that cache hasn't been set/updated ever, and in that case do > not assume that there are enough (3) brokers up, but instead lookup in ZK > how many brokers are alive, and based on that determine replication factor > for offsets topic. > > Does this make sense? > > As workaround, I guess I will have to resort to explicitly creating > offsets topic if it doesn't exist already. > > Kind regards, > Stevo Slavic. > > On Tue, Oct 6, 2015 at 11:34 AM, Stevo Slavić <ssla...@gmail.com> wrote: > >> Debugged, and found in KafkaApis.handleConsumerMetadataRequest that >> consumer offsets topic gets created on first lookup of offsets topic >> metadata, even when auto topic creation is disabled. >> >> In that method there is following call: >> >> // get metadata (and create the topic if necessary) >> val offsetsTopicMetadata = >> getTopicMetadata(Set(OffsetManager.OffsetsTopicName)).head >> >> and it returns on first call offsetsTopicMetadata with empty >> partitionsMetadata sequence and errorCode 5. >> >> By the docs this means: >> LeaderNotAvailable => This error is thrown if we are in the middle of a >> leadership election and there is currently no leader for this partition and >> hence it is unavailable for writes. >> >> Since leadership election takes some time, especially for default 50 >> partitions of consumer offsets topic, first call to lookup consumer >> metadata request will always return an error. This sounds like a bug, >> especially if it's not documented as to be expected behavior. I'd prefer if >> Kafka broker/controller did this init of consumer offsets topic on startup. >> >> As workaround, as init step of my integration test, I will lookup offsets >> topic metadata, until successful or timeout. >> >> Kind regards, >> Stevo Slavic. >> >> On Tue, Oct 6, 2015 at 10:02 AM, Stevo Slavić <ssla...@gmail.com> wrote: >> >>> Thanks Grant for quick reply! >>> >>> I've used AdminUtils.topicExists("__consumer_offsets") check and even >>> 10sec after Kafka broker startup, the check fails. >>> >>> When, on which event, does this internal topic get created? Is there >>> some broker config property preventing it from being created? Does one have >>> to use high level consumer or make some special request (JoingGroup?) using >>> simple consumer API to trigger consumer offsets topic init on broker? >>> >>> I'm using simple consumer API - I assume exclude.internal.topics, >>> offsets.storage or dual.commit.enabled however configured shouldn't affect >>> me, since I'm passing OffsetCommitRequest with version id 1, and even more >>> I do not even reach point where commit is done, since lookup of consumer >>> coordinator is throwing ConsumerCoordinatorNotAvailableException. >>> >>> Kind regards, >>> Stevo Slavic. >>> >>> On Mon, Oct 5, 2015 at 5:59 PM, Grant Henke <ghe...@cloudera.com> wrote: >>> >>>> Hi Stevo, >>>> >>>> There are a couple of options to verify the topic exists: >>>> >>>> 1. Consume from a topic with "offsets.storage=kafka". If its not >>>> created >>>> already, this should create it. >>>> 2. List and describe the topic using the Kafka topics script. Ex: >>>> >>>> bin/kafka-topics.sh --zookeeper localhost:2181 --list >>>> >>>> bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic >>>> __consumer_offsets >>>> >>>> >>>> 1. Check the ZNode exists in Zookeeper. Ex: >>>> >>>> bin/zookeeper-shell.sh localhost:2181 >>>> ls /brokers/topics/__consumer_offsets >>>> >>>> get /brokers/topics/__consumer_offsets >>>> >>>> >>>> Thanks, >>>> Grant >>>> >>>> On Mon, Oct 5, 2015 at 10:44 AM, Stevo Slavić <ssla...@gmail.com> >>>> wrote: >>>> >>>> > Hello Apache Kafka community, >>>> > >>>> > In my integration tests, with single 0.8.2.2 broker, for newly created >>>> > topic with single partition, after determining through topic metadata >>>> > request that partition has lead broker assigned, when I try to reset >>>> offset >>>> > for given consumer group, I first try to discover offset coordinator >>>> and >>>> > that lookup is throwing ConsumerCoordinatorNotAvailableException >>>> > >>>> > On >>>> > >>>> > >>>> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI >>>> > it is documented that broker returns >>>> ConsumerCoordinatorNotAvailableCode >>>> > for consumer metadata requests or offset commit requests if the >>>> offsets >>>> > topic has not yet been created. >>>> > >>>> > I wonder if this is really the case, that the offsets topic has not >>>> been >>>> > created. Any tips how to ensure/verify that offsets topic exists? >>>> > >>>> > Kind regards, >>>> > >>>> > Stevo Slavic. >>>> > >>>> >>>> >>>> >>>> -- >>>> Grant Henke >>>> Software Engineer | Cloudera >>>> gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke >>>> >>> >>> >> >