----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/#review70836 -----------------------------------------------------------
clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java <https://reviews.apache.org/r/27391/#comment116277> remove both clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java <https://reviews.apache.org/r/27391/#comment116310> Should we revert this rename since this is part of the public API? I would be surprised if people are using it though - but still. core/src/main/scala/kafka/api/OffsetCommitRequest.scala <https://reviews.apache.org/r/27391/#comment116314> I our convention is to include the if in the previous line. core/src/main/scala/kafka/api/OffsetCommitRequest.scala <https://reviews.apache.org/r/27391/#comment116315> same here core/src/main/scala/kafka/api/OffsetCommitRequest.scala <https://reviews.apache.org/r/27391/#comment116316> and here core/src/main/scala/kafka/api/OffsetCommitRequest.scala <https://reviews.apache.org/r/27391/#comment116317> and here core/src/main/scala/kafka/api/OffsetFetchRequest.scala <https://reviews.apache.org/r/27391/#comment116318> Can you run "organize imports"? Some of these seem redundant/unnecessary. core/src/main/scala/kafka/common/OffsetMetadataAndError.scala <https://reviews.apache.org/r/27391/#comment116320> (This is also a public API change - although you did add an Object wrapper further down that comes close to the original API.) core/src/main/scala/kafka/server/KafkaApis.scala <https://reviews.apache.org/r/27391/#comment116322> Shouldn't the commit timestamp _always_ be set to the current time? What I was thinking is this: If v0: - An explicit timestamp is provided only to override the v0 default retention which is add the server-side retention to the current timestamp. The (true) commit timestamp - i.e., receive time is useful for debugging purposes. So if an explicit timestamp is provided in v0 then use that to compute the absolute expire timestamp which will be the given commit timestamp; so you would store (commitTimestamp = now; expireTimestamp = given commitTimeStamp); if v0 and commit timestamp is default, then you would store (commitTimestamp = now, expireTimestamp = now + offsetRetention) - if v1: (commitTimestamp = now, expireTimestamp = now + offsetRetention) This way, you should have correct expiration behavior for v0, v1 and v2 and at the same time have the true commit timestamp - i.e., the receive time at the broker which is useful for debugging. (also see comment in OffsetManager) core/src/main/scala/kafka/server/OffsetManager.scala <https://reviews.apache.org/r/27391/#comment116374> Should we call this maxOffsetRetentionMs instead? core/src/main/scala/kafka/server/OffsetManager.scala <https://reviews.apache.org/r/27391/#comment116372> follow-up from above comment... and here you would set: commitTimestamp = timestamp expireTimestamp = timestamp So do you think this would work overall? I could be wrong - this patch has proven to be much trickier than we originally thought. - Joel Koshy On Jan. 24, 2015, 12:06 a.m., Guozhang Wang wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/27391/ > ----------------------------------------------------------- > > (Updated Jan. 24, 2015, 12:06 a.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1634 > https://issues.apache.org/jira/browse/KAFKA-1634 > > > Repository: kafka > > > Description > ------- > > Incorporated Jun's comments > > > Diffs > ----- > > clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java > 7517b879866fc5dad5f8d8ad30636da8bbe7784a > clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java > 121e880a941fcd3e6392859edba11a94236494cc > > clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java > 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f > > clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java > df37fc6d8f0db0b8192a948426af603be3444da4 > core/src/main/scala/kafka/api/OffsetCommitRequest.scala > 050615c72efe7dbaa4634f53943bd73273d20ffb > core/src/main/scala/kafka/api/OffsetFetchRequest.scala > c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 > core/src/main/scala/kafka/common/OffsetMetadataAndError.scala > 4cabffeacea09a49913505db19a96a55d58c0909 > core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala > 5487259751ebe19f137948249aa1fd2637d2deb4 > core/src/main/scala/kafka/server/KafkaApis.scala > ec8d9f7ba44741db40875458bd524c4062ad6a26 > core/src/main/scala/kafka/server/KafkaConfig.scala > 6d74983472249eac808d361344c58cc2858ec971 > core/src/main/scala/kafka/server/KafkaServer.scala > 89200da30a04943f0b9befe84ab17e62b747c8c4 > core/src/main/scala/kafka/server/OffsetManager.scala > 0bdd42fea931cddd072c0fff765b10526db6840a > core/src/main/scala/kafka/server/ReplicaManager.scala > e58fbb922e93b0c31dff04f187fcadb4ece986d7 > core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala > cd16ced5465d098be7a60498326b2a98c248f343 > core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala > 5b93239cdc26b5be7696f4e7863adb9fbe5f0ed5 > core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala > ba1e48e4300c9fb32e36e7266cb05294f2a481e5 > > Diff: https://reviews.apache.org/r/27391/diff/ > > > Testing > ------- > > > Thanks, > > Guozhang Wang > >