[ 
https://issues.apache.org/jira/browse/KAFKA-1818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eric Olander updated KAFKA-1818:
--------------------------------
    Status: Patch Available  (was: Open)

>From bb32b9bfbcc5e418cbb0b6b4e9f6aa39d5ce1345 Mon Sep 17 00:00:00 2001
From: Eric Olander <oland...@gmail.com>
Date: Sun, 14 Dec 2014 12:12:20 -0700
Subject: [PATCH] KAFKA-1818 clean up code to more idiomatic scala usage

---
 .../main/scala/kafka/utils/ReplicationUtils.scala  | 32 ++++++++--------------
 .../unit/kafka/utils/ReplicationUtilsTest.scala    | 10 +++++++
 2 files changed, 22 insertions(+), 20 deletions(-)

diff --git a/core/src/main/scala/kafka/utils/ReplicationUtils.scala 
b/core/src/main/scala/kafka/utils/ReplicationUtils.scala
index 7157673..4b642ea 100644
--- a/core/src/main/scala/kafka/utils/ReplicationUtils.scala
+++ b/core/src/main/scala/kafka/utils/ReplicationUtils.scala
@@ -61,30 +61,22 @@ object ReplicationUtils extends Logging {
 
   def getLeaderIsrAndEpochForPartition(zkClient: ZkClient, topic: String, 
partition: Int):Option[LeaderIsrAndControllerEpoch] = {
     val leaderAndIsrPath = ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, 
partition)
-    val leaderAndIsrInfo = ZkUtils.readDataMaybeNull(zkClient, 
leaderAndIsrPath)
-    val leaderAndIsrOpt = leaderAndIsrInfo._1
-    val stat = leaderAndIsrInfo._2
-    leaderAndIsrOpt match {
-      case Some(leaderAndIsrStr) => parseLeaderAndIsr(leaderAndIsrStr, 
leaderAndIsrPath, stat)
-      case None => None
-    }
+    val (leaderAndIsrOpt, stat) = ZkUtils.readDataMaybeNull(zkClient, 
leaderAndIsrPath)
+    leaderAndIsrOpt.flatMap(leaderAndIsrStr => 
parseLeaderAndIsr(leaderAndIsrStr, leaderAndIsrPath, stat))
   }
 
   private def parseLeaderAndIsr(leaderAndIsrStr: String, path: String, stat: 
Stat)
       : Option[LeaderIsrAndControllerEpoch] = {
-    Json.parseFull(leaderAndIsrStr) match {
-      case Some(m) =>
-        val leaderIsrAndEpochInfo = m.asInstanceOf[Map[String, Any]]
-        val leader = leaderIsrAndEpochInfo.get("leader").get.asInstanceOf[Int]
-        val epoch = 
leaderIsrAndEpochInfo.get("leader_epoch").get.asInstanceOf[Int]
-        val isr = leaderIsrAndEpochInfo.get("isr").get.asInstanceOf[List[Int]]
-        val controllerEpoch = 
leaderIsrAndEpochInfo.get("controller_epoch").get.asInstanceOf[Int]
-        val zkPathVersion = stat.getVersion
-        debug("Leader %d, Epoch %d, Isr %s, Zk path version %d for 
leaderAndIsrPath %s".format(leader, epoch,
-          isr.toString(), zkPathVersion, path))
-        Some(LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, epoch, isr, 
zkPathVersion), controllerEpoch))
-      case None => None
-    }
+    Json.parseFull(leaderAndIsrStr).flatMap{m =>
+      val leaderIsrAndEpochInfo = m.asInstanceOf[Map[String, Any]]
+      val leader = leaderIsrAndEpochInfo.get("leader").get.asInstanceOf[Int]
+      val epoch = 
leaderIsrAndEpochInfo.get("leader_epoch").get.asInstanceOf[Int]
+      val isr = leaderIsrAndEpochInfo.get("isr").get.asInstanceOf[List[Int]]
+      val controllerEpoch = 
leaderIsrAndEpochInfo.get("controller_epoch").get.asInstanceOf[Int]
+      val zkPathVersion = stat.getVersion
+      debug("Leader %d, Epoch %d, Isr %s, Zk path version %d for 
leaderAndIsrPath %s".format(leader, epoch,
+        isr.toString(), zkPathVersion, path))
+      Some(LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, epoch, isr, 
zkPathVersion), controllerEpoch))}
   }
 
 }
diff --git a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala 
b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
index 84e0855..305498a 100644
--- a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
@@ -17,6 +17,7 @@
 
 package kafka.utils
 
+import kafka.controller.LeaderIsrAndControllerEpoch
 import kafka.server.{ReplicaFetcherManager, KafkaConfig}
 import kafka.api.LeaderAndIsr
 import kafka.zk.ZooKeeperTestHarness
@@ -42,6 +43,8 @@ class ReplicationUtilsTest extends JUnit3Suite with 
ZooKeeperTestHarness {
   val topicDataMismatch = Json.encode(Map("controller_epoch" -> 1, "leader" -> 
1,
     "versions" -> 2, "leader_epoch" -> 2,"isr" -> List(1,2)))
 
+  val topicDataLeaderIsrAndControllerEpoch = 
LeaderIsrAndControllerEpoch(LeaderAndIsr(1,leaderEpoch,List(1,2),0), 
controllerEpoch)
+
 
   override def setUp() {
     super.setUp()
@@ -92,4 +95,11 @@ class ReplicationUtilsTest extends JUnit3Suite with 
ZooKeeperTestHarness {
     assertEquals(newZkVersion3,-1)
   }
 
+  @Test
+  def testGetLeaderIsrAndEpochForPartition() {
+    val leaderIsrAndControllerEpoch = 
ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partitionId)
+    assertEquals(topicDataLeaderIsrAndControllerEpoch, 
leaderIsrAndControllerEpoch.get)
+    assertEquals(None, 
ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partitionId 
+ 1))
+  }
+
 }
-- 
1.8.0



> Code cleanup in ReplicationUtils including unit test
> ----------------------------------------------------
>
>                 Key: KAFKA-1818
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1818
>             Project: Kafka
>          Issue Type: Improvement
>          Components: replication
>    Affects Versions: 0.8.1.1
>            Reporter: Eric Olander
>            Assignee: Neha Narkhede
>            Priority: Trivial
>
> Code in getLeaderIsrAndEpochForPartition() and parseLeaderAndIsr() was 
> essentially reimplementing the flatMap function on the Option type.  The 
> attached patch refactors that code to more idiomatic Scala and provides a 
> unit test over the affected code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to