My bad - I accidentally pushed this to apache instead of our internal
linkedin repo! I will delete this.

On Thu, Jul 16, 2015 at 05:26:55AM +0000, jjko...@apache.org wrote:
> Repository: kafka
> Updated Branches:
>   refs/heads/hotfix [created] a098de48e
> 
> 
> Hot fix for LIKAFKA-3492; force offset commit/fetches to go to kafka 
> regardless of request version
> 
> 
> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
> Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a098de48
> Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a098de48
> Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a098de48
> 
> Branch: refs/heads/hotfix
> Commit: a098de48e61bd0e713e88f3429f84cd57b5fb97d
> Parents: 9f80665
> Author: Joel Koshy <jjko...@gmail.com>
> Authored: Wed Jul 15 22:26:17 2015 -0700
> Committer: Joel Koshy <jjko...@gmail.com>
> Committed: Wed Jul 15 22:26:17 2015 -0700
> 
> ----------------------------------------------------------------------
>  core/src/main/scala/kafka/server/KafkaApis.scala        | 12 +++++++-----
>  .../test/scala/unit/kafka/server/OffsetCommitTest.scala |  2 +-
>  2 files changed, 8 insertions(+), 6 deletions(-)
> ----------------------------------------------------------------------
> 
> 
> http://git-wip-us.apache.org/repos/asf/kafka/blob/a098de48/core/src/main/scala/kafka/server/KafkaApis.scala
> ----------------------------------------------------------------------
> diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
> b/core/src/main/scala/kafka/server/KafkaApis.scala
> index d63bc18..528d759 100644
> --- a/core/src/main/scala/kafka/server/KafkaApis.scala
> +++ b/core/src/main/scala/kafka/server/KafkaApis.scala
> @@ -159,7 +159,8 @@ class KafkaApis(val requestChannel: RequestChannel,
>        requestChannel.sendResponse(new RequestChannel.Response(request, new 
> RequestOrResponseSend(request.connectionId, response)))
>      }
>  
> -    if (offsetCommitRequest.versionId == 0) {
> +    // hot fix for LIKAFKA-3492 (do not let offset commits/fetch requests go 
> to zookeeper)
> +    /*if (offsetCommitRequest.versionId == 0) {
>        // for version 0 always store offsets to ZK
>        val responseInfo = offsetCommitRequest.requestInfo.map {
>          case (topicAndPartition, metaAndError) => {
> @@ -181,7 +182,7 @@ class KafkaApis(val requestChannel: RequestChannel,
>        }
>  
>        sendResponseCallback(responseInfo)
> -    } else {
> +    } else {*/
>        // for version 1 and beyond store offsets in offset manager
>  
>        // compute the retention time based on the request version:
> @@ -222,7 +223,7 @@ class KafkaApis(val requestChannel: RequestChannel,
>          offsetCommitRequest.groupGenerationId,
>          offsetData,
>          sendResponseCallback)
> -    }
> +    //}
>    }
>  
>    /**
> @@ -473,7 +474,8 @@ class KafkaApis(val requestChannel: RequestChannel,
>    def handleOffsetFetchRequest(request: RequestChannel.Request) {
>      val offsetFetchRequest = 
> request.requestObj.asInstanceOf[OffsetFetchRequest]
>  
> -    val response = if (offsetFetchRequest.versionId == 0) {
> +    // hot fix for LIKAFKA-3492 (do not let offset commits/fetch requests go 
> to zookeeper)
> +    val response = /*if (offsetFetchRequest.versionId == 0) {
>        // version 0 reads offsets from ZK
>        val responseInfo = offsetFetchRequest.requestInfo.map( 
> topicAndPartition => {
>          val topicDirs = new ZKGroupTopicDirs(offsetFetchRequest.groupId, 
> topicAndPartition.topic)
> @@ -495,7 +497,7 @@ class KafkaApis(val requestChannel: RequestChannel,
>        })
>  
>        OffsetFetchResponse(collection.immutable.Map(responseInfo: _*), 
> offsetFetchRequest.correlationId)
> -    } else {
> +    } else */ {
>        // version 1 reads offsets from Kafka
>        val (unknownTopicPartitions, knownTopicPartitions) = 
> offsetFetchRequest.requestInfo.partition(topicAndPartition =>
>          metadataCache.getPartitionInfo(topicAndPartition.topic, 
> topicAndPartition.partition).isEmpty
> 
> http://git-wip-us.apache.org/repos/asf/kafka/blob/a098de48/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
> ----------------------------------------------------------------------
> diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
> b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
> index 528525b..b4a882b 100755
> --- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
> +++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
> @@ -235,7 +235,7 @@ class OffsetCommitTest extends JUnit3Suite with 
> ZooKeeperTestHarness {
>        versionId = 0
>      )
>      assertEquals(ErrorMapping.NoError, 
> simpleConsumer.commitOffsets(commitRequest0).commitStatus.get(topicPartition).get)
> -    assertEquals(-1L, 
> simpleConsumer.fetchOffsets(fetchRequest).requestInfo.get(topicPartition).get.offset)
> +    //assertEquals(-1L, 
> simpleConsumer.fetchOffsets(fetchRequest).requestInfo.get(topicPartition).get.offset)
>  
>      // committed offset should exist with fetch version 0
>      assertEquals(1L, simpleConsumer.fetchOffsets(OffsetFetchRequest(group, 
> Seq(TopicAndPartition(topic, 0)), versionId = 
> 0)).requestInfo.get(topicPartition).get.offset)
> 

Reply via email to