> On Sept. 4, 2013, 11:58 p.m., Neha Narkhede wrote:
> > core/src/main/scala/kafka/common/ErrorMapping.scala, line 45
> > <https://reviews.apache.org/r/13908/diff/1/?file=346515#file346515line45>
> >
> >     Could this error code be renamed to something like 
> > OffsetLoadingNotCompleteCode. Arguably this will convey the error code more 
> > clearly.

Agree. Would include change in v3 patch.


> On Sept. 4, 2013, 11:58 p.m., Neha Narkhede wrote:
> > core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala, line 
> > 280
> > <https://reviews.apache.org/r/13908/diff/1/?file=346517#file346517line280>
> >
> >     It will be good to be specific about which channel the consumer failed 
> > to establish. In this case, let's mention "Unable to establish a channel 
> > for fetching offsets with any of the live brokers in 
> > %s".format(brokers.mkString(','))

Agree. Would include change in v3 patch.


> On Sept. 4, 2013, 11:58 p.m., Neha Narkhede wrote:
> > core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala, line 
> > 321
> > <https://reviews.apache.org/r/13908/diff/1/?file=346517#file346517line321>
> >
> >     Is it a good idea for commitOffsets() to eat up every error that it 
> > encounters ? commitOffsets() is a public API and users want to use it to 
> > commit offsets on demand, manually. These users do not use auto commit 
> > offsets and use commitOffsets() to checkpoint offsets as often as the 
> > application logic dictates. For that use case, if the commitOffsets() has 
> > not actually successfully committed the offsets, the user of the API must 
> > know about it and retry as required. Thoughts?

Correct me if I am wrong: the Producer API does not expose failures to outside 
world. In case of failures, producer would internally retry the failed messages 
but thats behind the hood and would not be visible to the caller. With embedded 
producer, I could not find a way so that consumers would about failures w/o 
modifying the producer code. As "embedded producer" was a temporary hack 
thingy, we refrained from doing modifications in producer code to expose this 
info. This could be something that can be handled in phase #2 ie. using 
OffsetCommitRequest.


> On Sept. 4, 2013, 11:58 p.m., Neha Narkhede wrote:
> > core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala, line 
> > 664
> > <https://reviews.apache.org/r/13908/diff/1/?file=346517#file346517line664>
> >
> >     It is probably better to be clearer on this error message as well. 
> > Something along the lines of "as offset bootstrap is still in progress on 
> > some brokers. This means leadership changed recently for the offsets topic"

This is one of the points that Guozhang raised in his review comment 27.3 and 
now you; strong indication that I have got to change that sloppy message :) The 
loading process would be triggered by (a) broker startup and (b) leadership 
change. I tried to capture both these things in a log message but it looked 
ugly as it was too big to fit in one line. Technically 'broker startup' leads 
to leadership assignment which can be also seen as leadership change. With that 
argument, if we have to not distinguish between (a) and (b), then we could go 
with the message you suggested. Else, change the last part in your suggestion 
to "This means leadership changed recently for the offsets topic or the broker 
is starting up".


> On Sept. 4, 2013, 11:58 p.m., Neha Narkhede wrote:
> > core/src/main/scala/kafka/server/KafkaServer.scala, line 88
> > <https://reviews.apache.org/r/13908/diff/1/?file=346521#file346521line88>
> >
> >     Curious - why do we need to use the singleton pattern here? Shouldn't 
> > only one thread invoke KafkaServer.startup?

Its made singleton so that even if someone carelessly tries to create multiple 
offset managers on same server instance, there would still be a single offset 
manager. I agree that in current code this will not happen. The penalty of 
having multiple copies offset managers is huge in terms of memory and 
correctness, so made it singleton.


> On Sept. 4, 2013, 11:58 p.m., Neha Narkhede wrote:
> > core/src/main/scala/kafka/server/OffsetManager.scala, line 17
> > <https://reviews.apache.org/r/13908/diff/1/?file=346522#file346522line17>
> >
> >     this file has turned into a big blob of code. It will help if you can 
> > separate the OffsetManager trait, the DefaultOffsetManager and 
> > ZookeeperOffsetManager into separate files

Agree. Would include change in v3 patch.


> On Sept. 4, 2013, 11:58 p.m., Neha Narkhede wrote:
> > core/src/main/scala/kafka/server/OffsetManager.scala, line 51
> > <https://reviews.apache.org/r/13908/diff/1/?file=346522#file346522line51>
> >
> >     I think it is best to not include any parameters to the startup() API 
> > as it is difficult to come up with a set of parameters that would work for 
> > all possible offset managers. What might work better is to include a 
> > generic init API that takes in a Properties object. This API initializes 
> > the context required for the offset manager. startup might or might not be 
> > useful if we add init(Properties), I'm not so sure.

> 'include a generic init API that takes in a Properties object' : 
For this, KafkaServer needs to know which offset manager type it needs to 
spawn, then bake the properties relevant to it and pass it to 'init'. This wont 
abstract things from KafkaServer and everytime we add a new offset manager, the 
KafkaServer code must be modified. In the current patch, things are abstracted 
from KafkaServer. I could not figure out a way to achieve both: (a) abstracting 
offset manager type from KafkaServer and (b) making the startup arguments not 
implementation specific. Any suggestions ?


> On Sept. 4, 2013, 11:58 p.m., Neha Narkhede wrote:
> > core/src/main/scala/kafka/server/OffsetManager.scala, line 70
> > <https://reviews.apache.org/r/13908/diff/1/?file=346522#file346522line70>
> >
> >     load the offsets from the logs is not generic enough. What if the 
> > offsets are stored in a database or custom flat files ?

In the current patch, whatever backend is used to store the offsets (zk or 
in-memory hash table or database or custom file), offsets are always logged in 
the logs of offsets topic. Load would serve as a way to read stuff from those 
logs into the offset managers' storage. This probably seems stupid as if a user 
just wants to use Zk for offsets, logging in the logs, maintaining replicas is 
an overhead. The only advantage with that is while switching across different 
offset manager implementations. ie. if you are using Zk based offset manager, 
all offsets are in the logs and Zk. Switching to inbuilt offset manager would 
be just a config change and broker would start populating in-memory hash table 
from logs once bounced. 
I had raised this point over Jira (comment #2, point (2)) but have not heard 
anything about it. Any suggestions ?


> On Sept. 4, 2013, 11:58 p.m., Neha Narkhede wrote:
> > core/src/main/scala/kafka/server/OffsetManager.scala, line 74
> > <https://reviews.apache.org/r/13908/diff/1/?file=346522#file346522line74>
> >
> >     Agree with Sriram that this could be named differently. It will also 
> > help if we describe the purpose of each of these APIs clearly. For example, 
> > if I want to store offsets in a database, how do I know why 
> > triggerLoadOffsets is required? Is it used to bootstrap some sort of 
> > offsets cache on startup ? 
> >     
> >     Also try to describe when these APIs will be invoked on the Kafka 
> > server side. That will help the user implement a specific offset manager 
> > relatively easily

If your concern is just about the naming and description then its easily 
fixable. Does 'syncOffsetsFromLogs' seems ok ?


> On Sept. 4, 2013, 11:58 p.m., Neha Narkhede wrote:
> > core/src/main/scala/kafka/server/OffsetManager.scala, line 257
> > <https://reviews.apache.org/r/13908/diff/1/?file=346522#file346522line257>
> >
> >     There seems to be a race condition that might overwrite a newer offset 
> > with a stale one. This can happen when a broker becomes a leader for some 
> > partition of the offsets topic. When this happens, partition.makeLeader() 
> > exposes the broker as the new leader. At that point, it can start taking in 
> > offset commit requests. An offset commit request can come in at the same 
> > time that triggerLoadOffsets() is being invoked for the same offsets 
> > partition. putOffset() will go through and update the offsets table with 
> > the new offset. It does not touch commitsWhileLoading since loading does 
> > not have the key in it. Then the 1st statement in triggerLoadOffsets is 
> > executed and loading gets the offsets partition added to it. It goes ahead 
> > and updates the offsets table with the old offset since commitsWhileLoading 
> > was not updated by putOffset.

The loading process is changed and in newer patch, timestamp would be stored 
along with offsets. With that, it becomes a easy to prevent for such overwrites.


- Tejas


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/13908/#review25904
-----------------------------------------------------------


On Aug. 30, 2013, 9:19 p.m., Tejas Patil wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/13908/
> -----------------------------------------------------------
> 
> (Updated Aug. 30, 2013, 9:19 p.m.)
> 
> 
> Review request for kafka, Jay Kreps and Neha Narkhede.
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> See https://issues.apache.org/jira/browse/KAFKA-1012 for details.
> 
> 
> Diffs
> -----
> 
>   config/server.properties 7685879c2ab3d2dde1561bd34e6d9c55bc2429e3 
>   core/src/main/scala/kafka/api/OffsetFetchRequest.scala 
> a4c5623dbd48d9a0f21b87e39d63cde3604c64a0 
>   core/src/main/scala/kafka/common/ErrorMapping.scala 
> 153bc0b078d21200c02c47dd5ad9b7a7e3326ec4 
>   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 
> 59608a34202b4a635582c74c0068f3ae1bde0a13 
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
> e3a64204513467cef8917f501c3bc0e5b1db2e3e 
>   core/src/main/scala/kafka/producer/DefaultPartitioner.scala 
> 37ddd55b49680262ac348f77bb029dd4e84958cb 
>   core/src/main/scala/kafka/server/KafkaApis.scala 
> 0ec031ad9423b82ba9c8a49fe984337620392a8b 
>   core/src/main/scala/kafka/server/KafkaConfig.scala 
> ebbbdea8ab8798c95b39be9594b5b805a0f29d29 
>   core/src/main/scala/kafka/server/KafkaServer.scala 
> a925ae1a41fcb71f00ddf9e111172ec8a7fca749 
>   core/src/main/scala/kafka/server/OffsetManager.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 
> 73c87c663981002b52a0c4995a6ef96ca24d5ef4 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
> bc415e3156810db6c41509d9eb4aed4484496eee 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
> c0475d07a778ff957ad266c08a7a81ea500debd2 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 
> bab436dcef1645b5e327a5e7e68abdbe57604745 
> 
> Diff: https://reviews.apache.org/r/13908/diff/
> 
> 
> Testing
> -------
> 
> Manual testing with 3 brokers, 2 producers and 6 consumers. Existing junits 
> pass
> 
> 
> Thanks,
> 
> Tejas Patil
> 
>

Reply via email to