----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/24676/#review57235 -----------------------------------------------------------
Very nicely done. These are all minor comments - all but one concerning emptying the producer request that should be easily fixable if it is an issue. (It is the top comment) core/src/main/scala/kafka/api/ProducerRequest.scala <https://reviews.apache.org/r/24676/#comment98557> I have a concern that this may actually be still needed. See comment under handleProducerRequest.sendResponseCallback core/src/main/scala/kafka/cluster/Partition.scala <https://reviews.apache.org/r/24676/#comment97784> Maybe use this: "Recorded replica %d log end offset (LEO)..." Also, instead of an explicit [%s,%d] format specifier I think we should start doing the following: "%s".format(TopicAndPartition(topic, partition)) That way we ensure a canonical toString for topic/partition pairs and can change it in one place in the future. There are some places where we don't log with this agreed-upon format and it is a bit annoying, so going forward I think we should use the above. Can we add it to the logging improvements wiki? core/src/main/scala/kafka/cluster/Partition.scala <https://reviews.apache.org/r/24676/#comment97788> Since we still may update the HW shall we rename this to maybeUpdateHWAndExpandIsr core/src/main/scala/kafka/log/Log.scala <https://reviews.apache.org/r/24676/#comment97797> Since this contains hw (which is a replication detail) should it really be in the replica manager? core/src/main/scala/kafka/server/DelayedFetch.scala <https://reviews.apache.org/r/24676/#comment97809> How about just calling this responseCallback? It is slightly confusing to see references to callbackOnComplete and onComplete in the same class. core/src/main/scala/kafka/server/DelayedFetch.scala <https://reviews.apache.org/r/24676/#comment97801> The earlier comment was useful. i.e., (in which case we return whatever data is available for the partitions that are currently led by this broker) core/src/main/scala/kafka/server/DelayedFetch.scala <https://reviews.apache.org/r/24676/#comment97805> I'm a bit confused by case C. It can also happen if the delayed fetch happens to straddle a segment roll event; the comment seems a bit misleading/incomplete without that. In fact, if it is lagging shouldn't it have been satisfied immediately without having to create a DelayedFetch in the first place? core/src/main/scala/kafka/server/DelayedProduce.scala <https://reviews.apache.org/r/24676/#comment98139> Similar comment as in DelayedFetch on naming this. core/src/main/scala/kafka/server/KafkaApis.scala <https://reviews.apache.org/r/24676/#comment98165> Why is this additional logging necessary? KafkaApis currently has catch-all for unhandled exceptions. Error codes can be inspected via public access logs if required right? core/src/main/scala/kafka/server/KafkaApis.scala <https://reviews.apache.org/r/24676/#comment98166> Same here. core/src/main/scala/kafka/server/KafkaApis.scala <https://reviews.apache.org/r/24676/#comment98558> I'm not sure how scala treats this under the hood, but it _has_ to hold a reference to request until the callback is executed. i.e., we probably still want to empty the request data. core/src/main/scala/kafka/server/KafkaApis.scala <https://reviews.apache.org/r/24676/#comment98180> to fetch messages core/src/main/scala/kafka/server/KafkaApis.scala <https://reviews.apache.org/r/24676/#comment98182> Are these changes intentional? core/src/main/scala/kafka/server/OffsetManager.scala <https://reviews.apache.org/r/24676/#comment98184> commitStatusView Also, can we just compute the final status once at the end as opposed to preparing an initial response status and modifying later? core/src/main/scala/kafka/server/OffsetManager.scala <https://reviews.apache.org/r/24676/#comment98194> Do you think it would be clearer to name this onAppend or something similar? core/src/main/scala/kafka/server/ReplicaManager.scala <https://reviews.apache.org/r/24676/#comment97799> Should we rename ReplicaManager to ReplicatedLogManager? core/src/main/scala/kafka/server/ReplicaManager.scala <https://reviews.apache.org/r/24676/#comment98372> (for regular consumer fetch) core/src/main/scala/kafka/server/ReplicaManager.scala <https://reviews.apache.org/r/24676/#comment98380> This is old code and we don't need to address it in this patch, but I was wondering if it makes sense to respond sooner if there is at least one error in the local append. What do you think? i.e., I don't remember a good reason for holding on to the request if there are i < numPartitions errors in local append. core/src/main/scala/kafka/utils/DelayedItem.scala <https://reviews.apache.org/r/24676/#comment98553> We don't really need this class anymore and it can be folded into DelayedRequest right? core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala <https://reviews.apache.org/r/24676/#comment98554> weird comment - Joel Koshy On Oct. 17, 2014, 4:56 p.m., Guozhang Wang wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/24676/ > ----------------------------------------------------------- > > (Updated Oct. 17, 2014, 4:56 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1583 > https://issues.apache.org/jira/browse/KAFKA-1583 > > > Repository: kafka > > > Description > ------- > > Incorporate Jun's comments round two after rebase > > > Diffs > ----- > > core/src/main/scala/kafka/api/FetchRequest.scala > 59c09155dd25fad7bed07d3d00039e3dc66db95c > core/src/main/scala/kafka/api/FetchResponse.scala > 8d085a1f18f803b3cebae4739ad8f58f95a6c600 > core/src/main/scala/kafka/api/OffsetCommitRequest.scala > 861a6cf11dc6b6431fcbbe9de00c74a122f204bd > core/src/main/scala/kafka/api/ProducerRequest.scala > b2366e7eedcac17f657271d5293ff0bef6f3cbe6 > core/src/main/scala/kafka/api/ProducerResponse.scala > a286272c834b6f40164999ff8b7f8998875f2cfe > core/src/main/scala/kafka/cluster/Partition.scala > e88ecf224a4dab8bbd26ba7b0c3ccfe844c6b7f4 > core/src/main/scala/kafka/common/ErrorMapping.scala > 880ab4a004f078e5d84446ea6e4454ecc06c95f2 > core/src/main/scala/kafka/log/Log.scala > 157d67369baabd2206a2356b2aa421e848adab17 > core/src/main/scala/kafka/network/BoundedByteBufferSend.scala > a624359fb2059340bb8dc1619c5b5f226e26eb9b > core/src/main/scala/kafka/server/DelayedFetch.scala > e0f14e25af03e6d4344386dcabc1457ee784d345 > core/src/main/scala/kafka/server/DelayedProduce.scala > 9481508fc2d6140b36829840c337e557f3d090da > core/src/main/scala/kafka/server/FetchRequestPurgatory.scala > ed1318891253556cdf4d908033b704495acd5724 > core/src/main/scala/kafka/server/KafkaApis.scala > 85498b4a1368d3506f19c4cfc64934e4d0ac4c90 > core/src/main/scala/kafka/server/OffsetManager.scala > 43eb2a35bb54d32c66cdb94772df657b3a104d1a > core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala > d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f > core/src/main/scala/kafka/server/ReplicaManager.scala > 78b7514cc109547c562e635824684fad581af653 > core/src/main/scala/kafka/server/RequestPurgatory.scala > 9d76234bc2c810ec08621dc92bb4061b8e7cd993 > core/src/main/scala/kafka/utils/DelayedItem.scala > d7276494072f14f1cdf7d23f755ac32678c5675c > core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala > 209a409cb47eb24f83cee79f4e064dbc5f5e9d62 > core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala > fb61d552f2320fedec547400fbbe402a0b2f5d87 > core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala > 03a424d45215e1e7780567d9559dae4d0ae6fc29 > core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala > cd302aa51eb8377d88b752d48274e403926439f2 > core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala > a9c4ddc78df0b3695a77a12cf8cf25521a203122 > core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala > a577f4a8bf420a5bc1e62fad6d507a240a42bcaa > core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala > 3804a114e97c849cae48308997037786614173fc > core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala > 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 > > Diff: https://reviews.apache.org/r/24676/diff/ > > > Testing > ------- > > Unit tests > > > Thanks, > > Guozhang Wang > >