> On Aug. 25, 2014, 1:15 a.m., Jun Rao wrote: > > core/src/main/scala/kafka/log/Log.scala, lines 50-62 > > <https://reviews.apache.org/r/24676/diff/3/?file=666244#file666244line50> > > > > Should these two classes be in ReplicaManager since they are only used > > there?
I originally put them in ReplicaManager, but finally decided to move them to Log since they are in the return values of the public APIs. Let me know if you have a strong preference. > On Aug. 25, 2014, 1:15 a.m., Jun Rao wrote: > > core/src/main/scala/kafka/server/DelayedFetch.scala, line 28 > > <https://reviews.apache.org/r/24676/diff/3/?file=666246#file666246line28> > > > > fetchInfo.offset seems to be redunant wtih > > startOffsetMetadata.messageOffset. Instead of keeping fetchInfo, could we > > just keep fetchSize? I originally also did that; the reason for keeping the fetchInfo as a whole is that later on when re-fetching the data in DelayedFetch, we have to reconstruct this fetchInfo for the readFromLocalLog() call readanyways, so I just kept it along with the delayed fetch object. Let me know if you have a strong preference. > On Aug. 25, 2014, 1:15 a.m., Jun Rao wrote: > > core/src/main/scala/kafka/server/DelayedFetch.scala, lines 135-140 > > <https://reviews.apache.org/r/24676/diff/3/?file=666246#file666246line135> > > > > Should we use mapValues()? I think mapValues can be used here since first the resulted map will be used as pass-in parameters that will not be changed, and the second resulted map will be written to socket without modification also. But we can also enforce us to not using mapValues at all circumstances just to not make potential risks. Let me know what you think. > On Aug. 25, 2014, 1:15 a.m., Jun Rao wrote: > > core/src/main/scala/kafka/server/KafkaApis.scala, lines 213-218 > > <https://reviews.apache.org/r/24676/diff/3/?file=666249#file666249line213> > > > > Do we need the warning log here? It was originally in ReplicaManager, and I have moved them here. Following the error handling guidances I now think it may not be WARN but just DEBUG. Let me know if you have some thoughts. > On Aug. 25, 2014, 1:15 a.m., Jun Rao wrote: > > core/src/main/scala/kafka/server/KafkaApis.scala, lines 165-173 > > <https://reviews.apache.org/r/24676/diff/3/?file=666249#file666249line165> > > > > Do we need the warning log here? Ditto below. > On Aug. 25, 2014, 1:15 a.m., Jun Rao wrote: > > core/src/main/scala/kafka/server/ReplicaManager.scala, line 214 > > <https://reviews.apache.org/r/24676/diff/3/?file=666252#file666252line214> > > > > I think it's better to explicitly use the return clause unless this is > > the last statement in the method. For example, if later someone adds a > > return value at the end of the method, this value will no longer be > > returned. Good point. > On Aug. 25, 2014, 1:15 a.m., Jun Rao wrote: > > core/src/main/scala/kafka/server/ReplicaManager.scala, lines 427-428 > > <https://reviews.apache.org/r/24676/diff/3/?file=666252#file666252line427> > > > > This seems to be an existing problem. However, we need to make sure > > these stats are recorded only once when sending out the fetch response, not > > everytime when reading from a local log. I moved the metrics recording to KafkaApis, this make break the layered architecture a bit though.. > On Aug. 25, 2014, 1:15 a.m., Jun Rao wrote: > > core/src/main/scala/kafka/server/RequestPurgatory.scala, lines 143-146 > > <https://reviews.apache.org/r/24676/diff/3/?file=666253#file666253line143> > > > > This seems redunant given the check in line 120. This is actually not redundant: tryComplete() will still return false if the operation is already completed (i.e. the "completed" boolean is already set to true). - Guozhang ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/24676/#review51353 ----------------------------------------------------------- On Aug. 21, 2014, 6:33 p.m., Guozhang Wang wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/24676/ > ----------------------------------------------------------- > > (Updated Aug. 21, 2014, 6:33 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1583 > https://issues.apache.org/jira/browse/KAFKA-1583 > > > Repository: kafka > > > Description > ------- > > Changes include: > > 1. Remove Fetch/ProduceRequestPurgatory classes and move their checking > satisfaction logic into the delayedFetch/Produce. > 2. The base RequestPurgatory's API change tryCompleteElseWatch and > checkAndComplete > 3. The base DelayedRequest API change complete, expire, tryComplete (which > will call complete upon success) and isCompleted. > 4. Move the updatingReplicaLEO function from ReplicaManager into Partition. > 5. Move the appendMessageToLocalLog and readMessages from KafkaApis to > ReplicaManager. > 6. OffsetManager still used a nested callback for putting offsets into cache > while interacting with ReplicaManager. > > Would like reviews on: > > 2. 1. RequestPurgatory and DelayedRequest API and implementation. > 2. DelayedFetch/Produce API and implementation. > 3. ReplicaManager's readMessages/appendMessages API. > 4. OffsetManager's storeOffsets API. > 5. KafkaApis. > > > Diffs > ----- > > core/src/main/scala/kafka/api/FetchRequest.scala > 51cdccf7f90eb530cc62b094ed822b8469d50b12 > core/src/main/scala/kafka/api/FetchResponse.scala > af9308737bf7832eca018c2b3ede703f7d1209f1 > core/src/main/scala/kafka/api/OffsetCommitRequest.scala > 861a6cf11dc6b6431fcbbe9de00c74a122f204bd > core/src/main/scala/kafka/api/ProducerRequest.scala > b2366e7eedcac17f657271d5293ff0bef6f3cbe6 > core/src/main/scala/kafka/api/ProducerResponse.scala > a286272c834b6f40164999ff8b7f8998875f2cfe > core/src/main/scala/kafka/cluster/Partition.scala > ff106b47e6ee194cea1cf589474fef975b9dd7e2 > core/src/main/scala/kafka/common/ErrorMapping.scala > 3fae7910e4ce17bc8325887a046f383e0c151d44 > core/src/main/scala/kafka/log/Log.scala > 0ddf97bd30311b6039e19abade41d2fbbad2f59b > core/src/main/scala/kafka/network/BoundedByteBufferSend.scala > a624359fb2059340bb8dc1619c5b5f226e26eb9b > core/src/main/scala/kafka/server/DelayedFetch.scala > e0f14e25af03e6d4344386dcabc1457ee784d345 > core/src/main/scala/kafka/server/DelayedProduce.scala > 9481508fc2d6140b36829840c337e557f3d090da > core/src/main/scala/kafka/server/FetchRequestPurgatory.scala > ed1318891253556cdf4d908033b704495acd5724 > core/src/main/scala/kafka/server/KafkaApis.scala > c584b559416b3ee4bcbec5966be4891e0a03eefb > core/src/main/scala/kafka/server/OffsetManager.scala > 43eb2a35bb54d32c66cdb94772df657b3a104d1a > core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala > d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f > core/src/main/scala/kafka/server/ReplicaManager.scala > 68758e35d496a4659819960ae8e809d6e215568e > core/src/main/scala/kafka/server/RequestPurgatory.scala > ce06d2c381348deef8559374869fcaed923da1d1 > core/src/main/scala/kafka/utils/DelayedItem.scala > d7276494072f14f1cdf7d23f755ac32678c5675c > core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala > 168712de241125982d556c188c76514fceb93779 > core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala > 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 > > Diff: https://reviews.apache.org/r/24676/diff/ > > > Testing > ------- > > > Thanks, > > Guozhang Wang > >