----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/14730/#review27309 -----------------------------------------------------------
core/src/main/scala/kafka/server/AbstractFetcherManager.scala <https://reviews.apache.org/r/14730/#comment53135> This method can be written in a clearer way. There is code there that does the equivalent of collection.groupBy. If we change the input a set, we can first make a pass to compute the fetcherId and then use collection.groupBy to group the set by BrokerFetcherId. Finally, we can add the set per BokerFetcherId to the fetcherThreadMap and call addPartitions. core/src/main/scala/kafka/server/AbstractFetcherManager.scala <https://reviews.apache.org/r/14730/#comment53136> This needs to be protected by the mapLock. For simplicity, we can probably just hold the lock in the whole logic. Adding all partitions should be cheap and there is no strong reason to optimize that. core/src/main/scala/kafka/server/AbstractFetcherManager.scala <https://reviews.apache.org/r/14730/#comment53138> This can be written as fetcherThreadMap(brokerAndFetcherId). core/src/main/scala/kafka/server/AbstractFetcherThread.scala <https://reviews.apache.org/r/14730/#comment53141> I recommend that we hold the lock for the whole method. This may not be necessary for the logic we have now. However, this may change if the logic evolves in the future. Also, iterating an in-memory data structure should be cheap. So, optimizing the locking period is not necessary. core/src/main/scala/kafka/server/ReplicaManager.scala <https://reviews.apache.org/r/14730/#comment53142> Hmm, I don't see the code for marking them as stopped? core/src/main/scala/kafka/server/ReplicaManager.scala <https://reviews.apache.org/r/14730/#comment53145> I think we need to move replicaStateChangeLock to here. Otherwise, the checking and the updating of controllerEpoch may not be atomic. core/src/main/scala/kafka/server/ReplicaManager.scala <https://reviews.apache.org/r/14730/#comment53144> Could we use a better val name? Something like leaderPartitionInfos? core/src/main/scala/kafka/server/ReplicaManager.scala <https://reviews.apache.org/r/14730/#comment53143> Could we use named fields instead of _2? core/src/main/scala/kafka/server/ReplicaManager.scala <https://reviews.apache.org/r/14730/#comment53148> Can this logic be moved to becomeLeaderOrFollower since it's duplicated in becomeFollower()? core/src/main/scala/kafka/server/ReplicaManager.scala <https://reviews.apache.org/r/14730/#comment53146> Since we serialize the processing of all requests from the controller, we should never hit the code here. So, we should change the logging to WARN and change the logging message accordingly. core/src/main/scala/kafka/server/ReplicaManager.scala <https://reviews.apache.org/r/14730/#comment53149> Are we expecting any exceptions here? If so, what about the error code in the response? - Jun Rao On Oct. 21, 2013, 8:35 p.m., Guozhang Wang wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/14730/ > ----------------------------------------------------------- > > (Updated Oct. 21, 2013, 8:35 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1001 > https://issues.apache.org/jira/browse/KAFKA-1001 > > > Repository: kafka > > > Description > ------- > > Incorporated review comments > > > Diffs > ----- > > core/src/main/scala/kafka/cluster/Partition.scala > 5ccecd179d33abfc14dcefc35dd68de7474c6978 > core/src/main/scala/kafka/common/ErrorMapping.scala > 153bc0b078d21200c02c47dd5ad9b7a7e3326ec4 > core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala > 566ca46d113ee7da4b38ee57302ba183b59ab5d6 > core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala > dda0a8f041f242bf8a501a8cbd2b9c0258323f96 > core/src/main/scala/kafka/log/LogManager.scala > 47197153c5d3797d2e2a2f9539d9cd55501468e3 > core/src/main/scala/kafka/server/AbstractFetcherManager.scala > 15b7bd31446ffb97b8ed0fa6461649a01d81c7e9 > core/src/main/scala/kafka/server/AbstractFetcherThread.scala > c64260f12bdd6b6c964875e1f3873156442e44e1 > core/src/main/scala/kafka/server/ReplicaManager.scala > ee1cc0cf451b691eb91d9158ca765aeb60fc3dc8 > > Diff: https://reviews.apache.org/r/14730/diff/ > > > Testing > ------- > > > Thanks, > > Guozhang Wang > >