> On Nov. 7, 2014, 3 a.m., Joel Koshy wrote: > > clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java, > > line 152 > > <https://reviews.apache.org/r/27391/diff/2/?file=753752#file753752line152> > > > > This confused me a bit, and I think it is because initCommonFields is > > intended to initialize fields common to all versions of the request. It is > > a useful helper method but it becomes somewhat clunky when removing fields. > > The partition-level timestamp is no longer a common field. > > > > If this is v2 then we should _never_ set anything in the timestamp > > field of the struct; and if it is < v2 then we should _always_ set the > > timestamp field (even if it is the default). However, since the timestamp > > field in the Field declaration for OFFSET_COMMIT_REQUEST_PARTITION_V0 does > > not have a default explicitly specified, I think this will break with a > > SchemaException("missing value...") for offset commit request v0, v1 if we > > choose to write to a bytebuffer under those versions with this code. > > > > One option is to explicitly pass in the constructor version (0, 1, 2) > > to initCommonFields and use that to decide whether to include/exclude this > > field, but that is weird. Another alternative is a separate helper method > > for v0v1. That is even weirder. > > Guozhang Wang wrote: > Actually, the partition-level timestamp is still a commen field (we are > just deprecating it, and chose to not serialize / de-ser in v2). I agree this > is a bit wired as it is written in this way, I thought about this when I > started the first version but did not come up with a better approach.
Agree that it is still "common" in the object but it is completely removed from the wire protocol - i.e., OFFSET_COMMIT_REQUEST_PARTITION_V1 which is in the V2 OffsetCommitRequest does not have a timestamp. This method should probably be read as "initCommonFieldsInStruct" - i.e., effectively the wire protocol. That said, I'm loathe to add another init method which reads initCommonFieldsInV0AndV1. So I think rather than checking fetchPartitionData.timestamp it would be better to explicitly check the (already set) request version in the struct. If v0 or v1 then set the timestamp key name. > On Nov. 7, 2014, 3 a.m., Joel Koshy wrote: > > core/src/main/scala/kafka/common/OffsetMetadataAndError.scala, line 22 > > <https://reviews.apache.org/r/27391/diff/2/?file=753755#file753755line22> > > > > Can we mark this @Deprecated as well? > > > > We should probably make the primary constructor without timestamp and > > add a secondary constructor with timestamp and mark deprecated there. > > > > Also, can we use case class.copy if timestamp needs to be modified? > > However, per comment further down I don't think it needs to be touched. > > Guozhang Wang wrote: > Actually we cannot make it deprecated as it will be preserved even in the > new version, right? Note this is not used for the wire protocol but for the > cache / disk format. > > Guozhang Wang wrote: > I should say "not only for the wire protocol but also for cache disk > storage format". And thinking about this twice, I will change to two separate > classes, one for wire protocol and one for server storage format. Yes that is what I was thinking - we should ideally have a separate wire protocol and storage format. > On Nov. 7, 2014, 3 a.m., Joel Koshy wrote: > > core/src/main/scala/kafka/server/OffsetManager.scala, line 498 > > <https://reviews.apache.org/r/27391/diff/2/?file=753758#file753758line498> > > > > Actually, since we always set the retention period (for v0, v1) in > > KafkaApis do we need to even touch this timestamp? i.e., we should > > basically ignore it right? So we only need to do: > > value.set(VALUE_TIMESTAMP_FIELD, expirationTimestamp). > > Guozhang Wang wrote: > I think note. In v0/v1, if the timestamp is explicitly specified (i.e. > not -1) we need to use it as the expiration timestamp, or at least that was > how I understood the semantics. > > Guozhang Wang wrote: > "I think we cannot not" Right - what I meant was in KafkaApis we can just compute the retentionPeriod if v0 or v1. So if vo/v1 and timestamp = (now + 7 days), then set retention to 7 days. - Joel ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/#review60285 ----------------------------------------------------------- On Nov. 8, 2014, 12:54 a.m., Guozhang Wang wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/27391/ > ----------------------------------------------------------- > > (Updated Nov. 8, 2014, 12:54 a.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1634 > https://issues.apache.org/jira/browse/KAFKA-1634 > > > Repository: kafka > > > Description > ------- > > The timestamp field of OffsetAndMetadata is preserved since we need to be > backward compatible with older versions > > > Diffs > ----- > > clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java > 7517b879866fc5dad5f8d8ad30636da8bbe7784a > > clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java > 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f > > clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java > df37fc6d8f0db0b8192a948426af603be3444da4 > core/src/main/scala/kafka/api/OffsetCommitRequest.scala > 050615c72efe7dbaa4634f53943bd73273d20ffb > core/src/main/scala/kafka/api/OffsetFetchRequest.scala > c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 > core/src/main/scala/kafka/common/OffsetMetadataAndError.scala > 4cabffeacea09a49913505db19a96a55d58c0909 > core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala > fbc680fde21b02f11285a4f4b442987356abd17b > core/src/main/scala/kafka/server/KafkaApis.scala > 968b0c4f809ea9f9c3f322aef9db105f9d2e0e23 > core/src/main/scala/kafka/server/KafkaServer.scala > 4de812374e8fb1fed834d2be3f9655f55b511a74 > core/src/main/scala/kafka/server/OffsetManager.scala > 2957bc435102bc4004d8f100dbcdd56287c8ffae > core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala > cd16ced5465d098be7a60498326b2a98c248f343 > core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala > 8c5364fa97da1be09973c176d1baeb339455d319 > > Diff: https://reviews.apache.org/r/27391/diff/ > > > Testing > ------- > > > Thanks, > > Guozhang Wang > >