-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/#review70836
-----------------------------------------------------------



clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
<https://reviews.apache.org/r/27391/#comment116277>

    remove both



clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
<https://reviews.apache.org/r/27391/#comment116310>

    Should we revert this rename since this is part of the public API? I would 
be surprised if people are using it though - but still.



core/src/main/scala/kafka/api/OffsetCommitRequest.scala
<https://reviews.apache.org/r/27391/#comment116314>

    I our convention is to include the if in the previous line.



core/src/main/scala/kafka/api/OffsetCommitRequest.scala
<https://reviews.apache.org/r/27391/#comment116315>

    same here



core/src/main/scala/kafka/api/OffsetCommitRequest.scala
<https://reviews.apache.org/r/27391/#comment116316>

    and here



core/src/main/scala/kafka/api/OffsetCommitRequest.scala
<https://reviews.apache.org/r/27391/#comment116317>

    and here



core/src/main/scala/kafka/api/OffsetFetchRequest.scala
<https://reviews.apache.org/r/27391/#comment116318>

    Can you run "organize imports"? Some of these seem redundant/unnecessary.



core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
<https://reviews.apache.org/r/27391/#comment116320>

    (This is also a public API change - although you did add an Object wrapper 
further down that comes close to the original API.)



core/src/main/scala/kafka/server/KafkaApis.scala
<https://reviews.apache.org/r/27391/#comment116322>

    Shouldn't the commit timestamp _always_ be set to the current time?
    
    What I was thinking is this:
    If v0:
    - An explicit timestamp is provided only to override the v0 default 
retention which is add the server-side retention to the current timestamp. The 
(true) commit timestamp - i.e., receive time is useful for debugging purposes. 
So if an explicit timestamp is provided in v0 then use that to compute the 
absolute expire timestamp which will be the given commit timestamp; so you 
would store (commitTimestamp = now; expireTimestamp = given commitTimeStamp); 
if v0 and commit timestamp is default, then you would store (commitTimestamp = 
now, expireTimestamp = now + offsetRetention)
    - if v1: (commitTimestamp = now, expireTimestamp = now + offsetRetention)
    
    This way, you should have correct expiration behavior for v0, v1 and v2 and 
at the same time have the true commit timestamp - i.e., the receive time at the 
broker which is useful for debugging. (also see comment in OffsetManager)



core/src/main/scala/kafka/server/OffsetManager.scala
<https://reviews.apache.org/r/27391/#comment116374>

    Should we call this maxOffsetRetentionMs instead?



core/src/main/scala/kafka/server/OffsetManager.scala
<https://reviews.apache.org/r/27391/#comment116372>

    follow-up from above comment...
    and here you would set:
    commitTimestamp = timestamp
    expireTimestamp = timestamp
    
    So do you think this would work overall?
    
    I could be wrong - this patch has proven to be much trickier than we 
originally thought.


- Joel Koshy


On Jan. 24, 2015, 12:06 a.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/27391/
> -----------------------------------------------------------
> 
> (Updated Jan. 24, 2015, 12:06 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1634
>     https://issues.apache.org/jira/browse/KAFKA-1634
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Incorporated Jun's comments
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
> 7517b879866fc5dad5f8d8ad30636da8bbe7784a 
>   clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 
> 121e880a941fcd3e6392859edba11a94236494cc 
>   
> clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
>  3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f 
>   
> clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
>  df37fc6d8f0db0b8192a948426af603be3444da4 
>   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
> 050615c72efe7dbaa4634f53943bd73273d20ffb 
>   core/src/main/scala/kafka/api/OffsetFetchRequest.scala 
> c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 
>   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 
> 4cabffeacea09a49913505db19a96a55d58c0909 
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
> 5487259751ebe19f137948249aa1fd2637d2deb4 
>   core/src/main/scala/kafka/server/KafkaApis.scala 
> ec8d9f7ba44741db40875458bd524c4062ad6a26 
>   core/src/main/scala/kafka/server/KafkaConfig.scala 
> 6d74983472249eac808d361344c58cc2858ec971 
>   core/src/main/scala/kafka/server/KafkaServer.scala 
> 89200da30a04943f0b9befe84ab17e62b747c8c4 
>   core/src/main/scala/kafka/server/OffsetManager.scala 
> 0bdd42fea931cddd072c0fff765b10526db6840a 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 
> e58fbb922e93b0c31dff04f187fcadb4ece986d7 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
> cd16ced5465d098be7a60498326b2a98c248f343 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
> 5b93239cdc26b5be7696f4e7863adb9fbe5f0ed5 
>   core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 
> ba1e48e4300c9fb32e36e7266cb05294f2a481e5 
> 
> Diff: https://reviews.apache.org/r/27391/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>

Reply via email to