-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/13908/#review25904
-----------------------------------------------------------



core/src/main/scala/kafka/common/ErrorMapping.scala
<https://reviews.apache.org/r/13908/#comment50538>

    Could this error code be renamed to something like 
OffsetLoadingNotCompleteCode. Arguably this will convey the error code more 
clearly.



core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
<https://reviews.apache.org/r/13908/#comment50544>

    It will be good to be specific about which channel the consumer failed to 
establish. In this case, let's mention "Unable to establish a channel for 
fetching offsets with any of the live brokers in 
%s".format(brokers.mkString(','))



core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
<https://reviews.apache.org/r/13908/#comment50545>

    Is it a good idea for commitOffsets() to eat up every error that it 
encounters ? commitOffsets() is a public API and users want to use it to commit 
offsets on demand, manually. These users do not use auto commit offsets and use 
commitOffsets() to checkpoint offsets as often as the application logic 
dictates. For that use case, if the commitOffsets() has not actually 
successfully committed the offsets, the user of the API must know about it and 
retry as required. Thoughts?



core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
<https://reviews.apache.org/r/13908/#comment50550>

    It is probably better to be clearer on this error message as well. 
Something along the lines of "as offset bootstrap is still in progress on some 
brokers. This means leadership changed recently for the offsets topic"



core/src/main/scala/kafka/server/KafkaServer.scala
<https://reviews.apache.org/r/13908/#comment50556>

    Curious - why do we need to use the singleton pattern here? Shouldn't only 
one thread invoke KafkaServer.startup?



core/src/main/scala/kafka/server/OffsetManager.scala
<https://reviews.apache.org/r/13908/#comment50557>

    this file has turned into a big blob of code. It will help if you can 
separate the OffsetManager trait, the DefaultOffsetManager and 
ZookeeperOffsetManager into separate files



core/src/main/scala/kafka/server/OffsetManager.scala
<https://reviews.apache.org/r/13908/#comment50552>

    I think it is best to not include any parameters to the startup() API as it 
is difficult to come up with a set of parameters that would work for all 
possible offset managers. What might work better is to include a generic init 
API that takes in a Properties object. This API initializes the context 
required for the offset manager. startup might or might not be useful if we add 
init(Properties), I'm not so sure.  



core/src/main/scala/kafka/server/OffsetManager.scala
<https://reviews.apache.org/r/13908/#comment50555>

    load the offsets from the logs is not generic enough. What if the offsets 
are stored in a database or custom flat files ? 



core/src/main/scala/kafka/server/OffsetManager.scala
<https://reviews.apache.org/r/13908/#comment50554>

    Agree with Sriram that this could be named differently. It will also help 
if we describe the purpose of each of these APIs clearly. For example, if I 
want to store offsets in a database, how do I know why triggerLoadOffsets is 
required? Is it used to bootstrap some sort of offsets cache on startup ? 
    
    Also try to describe when these APIs will be invoked on the Kafka server 
side. That will help the user implement a specific offset manager relatively 
easily



core/src/main/scala/kafka/server/OffsetManager.scala
<https://reviews.apache.org/r/13908/#comment50577>

    There seems to be a race condition that might overwrite a newer offset with 
a stale one. This can happen when a broker becomes a leader for some partition 
of the offsets topic. When this happens, partition.makeLeader() exposes the 
broker as the new leader. At that point, it can start taking in offset commit 
requests. An offset commit request can come in at the same time that 
triggerLoadOffsets() is being invoked for the same offsets partition. 
putOffset() will go through and update the offsets table with the new offset. 
It does not touch commitsWhileLoading since loading does not have the key in 
it. Then the 1st statement in triggerLoadOffsets is executed and loading gets 
the offsets partition added to it. It goes ahead and updates the offsets table 
with the old offset since commitsWhileLoading was not updated by putOffset.


- Neha Narkhede


On Aug. 30, 2013, 9:19 p.m., Tejas Patil wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/13908/
> -----------------------------------------------------------
> 
> (Updated Aug. 30, 2013, 9:19 p.m.)
> 
> 
> Review request for kafka, Jay Kreps and Neha Narkhede.
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> See https://issues.apache.org/jira/browse/KAFKA-1012 for details.
> 
> 
> Diffs
> -----
> 
>   config/server.properties 7685879c2ab3d2dde1561bd34e6d9c55bc2429e3 
>   core/src/main/scala/kafka/api/OffsetFetchRequest.scala 
> a4c5623dbd48d9a0f21b87e39d63cde3604c64a0 
>   core/src/main/scala/kafka/common/ErrorMapping.scala 
> 153bc0b078d21200c02c47dd5ad9b7a7e3326ec4 
>   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 
> 59608a34202b4a635582c74c0068f3ae1bde0a13 
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
> e3a64204513467cef8917f501c3bc0e5b1db2e3e 
>   core/src/main/scala/kafka/producer/DefaultPartitioner.scala 
> 37ddd55b49680262ac348f77bb029dd4e84958cb 
>   core/src/main/scala/kafka/server/KafkaApis.scala 
> 0ec031ad9423b82ba9c8a49fe984337620392a8b 
>   core/src/main/scala/kafka/server/KafkaConfig.scala 
> ebbbdea8ab8798c95b39be9594b5b805a0f29d29 
>   core/src/main/scala/kafka/server/KafkaServer.scala 
> a925ae1a41fcb71f00ddf9e111172ec8a7fca749 
>   core/src/main/scala/kafka/server/OffsetManager.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 
> 73c87c663981002b52a0c4995a6ef96ca24d5ef4 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
> bc415e3156810db6c41509d9eb4aed4484496eee 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
> c0475d07a778ff957ad266c08a7a81ea500debd2 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 
> bab436dcef1645b5e327a5e7e68abdbe57604745 
> 
> Diff: https://reviews.apache.org/r/13908/diff/
> 
> 
> Testing
> -------
> 
> Manual testing with 3 brokers, 2 producers and 6 consumers. Existing junits 
> pass
> 
> 
> Thanks,
> 
> Tejas Patil
> 
>

Reply via email to