My bad - I accidentally pushed this to apache instead of our internal linkedin repo! I will delete this.
On Thu, Jul 16, 2015 at 05:26:55AM +0000, jjko...@apache.org wrote: > Repository: kafka > Updated Branches: > refs/heads/hotfix [created] a098de48e > > > Hot fix for LIKAFKA-3492; force offset commit/fetches to go to kafka > regardless of request version > > > Project: http://git-wip-us.apache.org/repos/asf/kafka/repo > Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a098de48 > Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a098de48 > Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a098de48 > > Branch: refs/heads/hotfix > Commit: a098de48e61bd0e713e88f3429f84cd57b5fb97d > Parents: 9f80665 > Author: Joel Koshy <jjko...@gmail.com> > Authored: Wed Jul 15 22:26:17 2015 -0700 > Committer: Joel Koshy <jjko...@gmail.com> > Committed: Wed Jul 15 22:26:17 2015 -0700 > > ---------------------------------------------------------------------- > core/src/main/scala/kafka/server/KafkaApis.scala | 12 +++++++----- > .../test/scala/unit/kafka/server/OffsetCommitTest.scala | 2 +- > 2 files changed, 8 insertions(+), 6 deletions(-) > ---------------------------------------------------------------------- > > > http://git-wip-us.apache.org/repos/asf/kafka/blob/a098de48/core/src/main/scala/kafka/server/KafkaApis.scala > ---------------------------------------------------------------------- > diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala > b/core/src/main/scala/kafka/server/KafkaApis.scala > index d63bc18..528d759 100644 > --- a/core/src/main/scala/kafka/server/KafkaApis.scala > +++ b/core/src/main/scala/kafka/server/KafkaApis.scala > @@ -159,7 +159,8 @@ class KafkaApis(val requestChannel: RequestChannel, > requestChannel.sendResponse(new RequestChannel.Response(request, new > RequestOrResponseSend(request.connectionId, response))) > } > > - if (offsetCommitRequest.versionId == 0) { > + // hot fix for LIKAFKA-3492 (do not let offset commits/fetch requests go > to zookeeper) > + /*if (offsetCommitRequest.versionId == 0) { > // for version 0 always store offsets to ZK > val responseInfo = offsetCommitRequest.requestInfo.map { > case (topicAndPartition, metaAndError) => { > @@ -181,7 +182,7 @@ class KafkaApis(val requestChannel: RequestChannel, > } > > sendResponseCallback(responseInfo) > - } else { > + } else {*/ > // for version 1 and beyond store offsets in offset manager > > // compute the retention time based on the request version: > @@ -222,7 +223,7 @@ class KafkaApis(val requestChannel: RequestChannel, > offsetCommitRequest.groupGenerationId, > offsetData, > sendResponseCallback) > - } > + //} > } > > /** > @@ -473,7 +474,8 @@ class KafkaApis(val requestChannel: RequestChannel, > def handleOffsetFetchRequest(request: RequestChannel.Request) { > val offsetFetchRequest = > request.requestObj.asInstanceOf[OffsetFetchRequest] > > - val response = if (offsetFetchRequest.versionId == 0) { > + // hot fix for LIKAFKA-3492 (do not let offset commits/fetch requests go > to zookeeper) > + val response = /*if (offsetFetchRequest.versionId == 0) { > // version 0 reads offsets from ZK > val responseInfo = offsetFetchRequest.requestInfo.map( > topicAndPartition => { > val topicDirs = new ZKGroupTopicDirs(offsetFetchRequest.groupId, > topicAndPartition.topic) > @@ -495,7 +497,7 @@ class KafkaApis(val requestChannel: RequestChannel, > }) > > OffsetFetchResponse(collection.immutable.Map(responseInfo: _*), > offsetFetchRequest.correlationId) > - } else { > + } else */ { > // version 1 reads offsets from Kafka > val (unknownTopicPartitions, knownTopicPartitions) = > offsetFetchRequest.requestInfo.partition(topicAndPartition => > metadataCache.getPartitionInfo(topicAndPartition.topic, > topicAndPartition.partition).isEmpty > > http://git-wip-us.apache.org/repos/asf/kafka/blob/a098de48/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala > ---------------------------------------------------------------------- > diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala > b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala > index 528525b..b4a882b 100755 > --- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala > +++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala > @@ -235,7 +235,7 @@ class OffsetCommitTest extends JUnit3Suite with > ZooKeeperTestHarness { > versionId = 0 > ) > assertEquals(ErrorMapping.NoError, > simpleConsumer.commitOffsets(commitRequest0).commitStatus.get(topicPartition).get) > - assertEquals(-1L, > simpleConsumer.fetchOffsets(fetchRequest).requestInfo.get(topicPartition).get.offset) > + //assertEquals(-1L, > simpleConsumer.fetchOffsets(fetchRequest).requestInfo.get(topicPartition).get.offset) > > // committed offset should exist with fetch version 0 > assertEquals(1L, simpleConsumer.fetchOffsets(OffsetFetchRequest(group, > Seq(TopicAndPartition(topic, 0)), versionId = > 0)).requestInfo.get(topicPartition).get.offset) >