----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/13908/#review25904 -----------------------------------------------------------
core/src/main/scala/kafka/common/ErrorMapping.scala <https://reviews.apache.org/r/13908/#comment50538> Could this error code be renamed to something like OffsetLoadingNotCompleteCode. Arguably this will convey the error code more clearly. core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala <https://reviews.apache.org/r/13908/#comment50544> It will be good to be specific about which channel the consumer failed to establish. In this case, let's mention "Unable to establish a channel for fetching offsets with any of the live brokers in %s".format(brokers.mkString(',')) core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala <https://reviews.apache.org/r/13908/#comment50545> Is it a good idea for commitOffsets() to eat up every error that it encounters ? commitOffsets() is a public API and users want to use it to commit offsets on demand, manually. These users do not use auto commit offsets and use commitOffsets() to checkpoint offsets as often as the application logic dictates. For that use case, if the commitOffsets() has not actually successfully committed the offsets, the user of the API must know about it and retry as required. Thoughts? core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala <https://reviews.apache.org/r/13908/#comment50550> It is probably better to be clearer on this error message as well. Something along the lines of "as offset bootstrap is still in progress on some brokers. This means leadership changed recently for the offsets topic" core/src/main/scala/kafka/server/KafkaServer.scala <https://reviews.apache.org/r/13908/#comment50556> Curious - why do we need to use the singleton pattern here? Shouldn't only one thread invoke KafkaServer.startup? core/src/main/scala/kafka/server/OffsetManager.scala <https://reviews.apache.org/r/13908/#comment50557> this file has turned into a big blob of code. It will help if you can separate the OffsetManager trait, the DefaultOffsetManager and ZookeeperOffsetManager into separate files core/src/main/scala/kafka/server/OffsetManager.scala <https://reviews.apache.org/r/13908/#comment50552> I think it is best to not include any parameters to the startup() API as it is difficult to come up with a set of parameters that would work for all possible offset managers. What might work better is to include a generic init API that takes in a Properties object. This API initializes the context required for the offset manager. startup might or might not be useful if we add init(Properties), I'm not so sure. core/src/main/scala/kafka/server/OffsetManager.scala <https://reviews.apache.org/r/13908/#comment50555> load the offsets from the logs is not generic enough. What if the offsets are stored in a database or custom flat files ? core/src/main/scala/kafka/server/OffsetManager.scala <https://reviews.apache.org/r/13908/#comment50554> Agree with Sriram that this could be named differently. It will also help if we describe the purpose of each of these APIs clearly. For example, if I want to store offsets in a database, how do I know why triggerLoadOffsets is required? Is it used to bootstrap some sort of offsets cache on startup ? Also try to describe when these APIs will be invoked on the Kafka server side. That will help the user implement a specific offset manager relatively easily core/src/main/scala/kafka/server/OffsetManager.scala <https://reviews.apache.org/r/13908/#comment50577> There seems to be a race condition that might overwrite a newer offset with a stale one. This can happen when a broker becomes a leader for some partition of the offsets topic. When this happens, partition.makeLeader() exposes the broker as the new leader. At that point, it can start taking in offset commit requests. An offset commit request can come in at the same time that triggerLoadOffsets() is being invoked for the same offsets partition. putOffset() will go through and update the offsets table with the new offset. It does not touch commitsWhileLoading since loading does not have the key in it. Then the 1st statement in triggerLoadOffsets is executed and loading gets the offsets partition added to it. It goes ahead and updates the offsets table with the old offset since commitsWhileLoading was not updated by putOffset. - Neha Narkhede On Aug. 30, 2013, 9:19 p.m., Tejas Patil wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/13908/ > ----------------------------------------------------------- > > (Updated Aug. 30, 2013, 9:19 p.m.) > > > Review request for kafka, Jay Kreps and Neha Narkhede. > > > Repository: kafka > > > Description > ------- > > See https://issues.apache.org/jira/browse/KAFKA-1012 for details. > > > Diffs > ----- > > config/server.properties 7685879c2ab3d2dde1561bd34e6d9c55bc2429e3 > core/src/main/scala/kafka/api/OffsetFetchRequest.scala > a4c5623dbd48d9a0f21b87e39d63cde3604c64a0 > core/src/main/scala/kafka/common/ErrorMapping.scala > 153bc0b078d21200c02c47dd5ad9b7a7e3326ec4 > core/src/main/scala/kafka/common/OffsetMetadataAndError.scala > 59608a34202b4a635582c74c0068f3ae1bde0a13 > core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala > e3a64204513467cef8917f501c3bc0e5b1db2e3e > core/src/main/scala/kafka/producer/DefaultPartitioner.scala > 37ddd55b49680262ac348f77bb029dd4e84958cb > core/src/main/scala/kafka/server/KafkaApis.scala > 0ec031ad9423b82ba9c8a49fe984337620392a8b > core/src/main/scala/kafka/server/KafkaConfig.scala > ebbbdea8ab8798c95b39be9594b5b805a0f29d29 > core/src/main/scala/kafka/server/KafkaServer.scala > a925ae1a41fcb71f00ddf9e111172ec8a7fca749 > core/src/main/scala/kafka/server/OffsetManager.scala PRE-CREATION > core/src/main/scala/kafka/server/ReplicaManager.scala > 73c87c663981002b52a0c4995a6ef96ca24d5ef4 > core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala > bc415e3156810db6c41509d9eb4aed4484496eee > core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala > c0475d07a778ff957ad266c08a7a81ea500debd2 > core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala > bab436dcef1645b5e327a5e7e68abdbe57604745 > > Diff: https://reviews.apache.org/r/13908/diff/ > > > Testing > ------- > > Manual testing with 3 brokers, 2 producers and 6 consumers. Existing junits > pass > > > Thanks, > > Tejas Patil > >