-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/#review57235
-----------------------------------------------------------


Very nicely done.

These are all minor comments - all but one concerning emptying the producer 
request that should be easily fixable if it is an issue. (It is the top comment)


core/src/main/scala/kafka/api/ProducerRequest.scala
<https://reviews.apache.org/r/24676/#comment98557>

    I have a concern that this may actually be still needed. See comment under 
handleProducerRequest.sendResponseCallback



core/src/main/scala/kafka/cluster/Partition.scala
<https://reviews.apache.org/r/24676/#comment97784>

    Maybe use this:
    "Recorded replica %d log end offset (LEO)..."
    
    Also, instead of an explicit [%s,%d] format specifier I think we should 
start doing the following:
    
    "%s".format(TopicAndPartition(topic, partition))
    
    That way we ensure a canonical toString for topic/partition pairs and can 
change it in one place in the future.
    
    There are some places where we don't log with this agreed-upon format and 
it is a bit annoying, so going forward I think we should use the above. Can we 
add it to the logging improvements wiki?



core/src/main/scala/kafka/cluster/Partition.scala
<https://reviews.apache.org/r/24676/#comment97788>

    Since we still may update the HW shall we rename this to 
maybeUpdateHWAndExpandIsr



core/src/main/scala/kafka/log/Log.scala
<https://reviews.apache.org/r/24676/#comment97797>

    Since this contains hw (which is a replication detail) should it really be 
in the replica manager?



core/src/main/scala/kafka/server/DelayedFetch.scala
<https://reviews.apache.org/r/24676/#comment97809>

    How about just calling this responseCallback? It is slightly confusing to 
see references to callbackOnComplete and onComplete in the same class.



core/src/main/scala/kafka/server/DelayedFetch.scala
<https://reviews.apache.org/r/24676/#comment97801>

    The earlier comment was useful. i.e., (in which case we return whatever 
data is available for the partitions that are currently led by this broker)



core/src/main/scala/kafka/server/DelayedFetch.scala
<https://reviews.apache.org/r/24676/#comment97805>

    I'm a bit confused by case C. It can also happen if the delayed fetch 
happens to straddle a segment roll event; the comment seems a bit 
misleading/incomplete without that.
    
    In fact, if it is lagging shouldn't it have been satisfied immediately 
without having to create a DelayedFetch in the first place?



core/src/main/scala/kafka/server/DelayedProduce.scala
<https://reviews.apache.org/r/24676/#comment98139>

    Similar comment as in DelayedFetch on naming this.



core/src/main/scala/kafka/server/KafkaApis.scala
<https://reviews.apache.org/r/24676/#comment98165>

    Why is this additional logging necessary?
    
    KafkaApis currently has catch-all for unhandled exceptions.
    
    Error codes can be inspected via public access logs if required right?



core/src/main/scala/kafka/server/KafkaApis.scala
<https://reviews.apache.org/r/24676/#comment98166>

    Same here.



core/src/main/scala/kafka/server/KafkaApis.scala
<https://reviews.apache.org/r/24676/#comment98558>

    I'm not sure how scala treats this under the hood, but it _has_ to hold a 
reference to request until the callback is executed. i.e., we probably still 
want to empty the request data.



core/src/main/scala/kafka/server/KafkaApis.scala
<https://reviews.apache.org/r/24676/#comment98180>

    to fetch messages



core/src/main/scala/kafka/server/KafkaApis.scala
<https://reviews.apache.org/r/24676/#comment98182>

    Are these changes intentional?



core/src/main/scala/kafka/server/OffsetManager.scala
<https://reviews.apache.org/r/24676/#comment98184>

    commitStatusView
    
    Also, can we just compute the final status once at the end as opposed to 
preparing an initial response status and modifying later?



core/src/main/scala/kafka/server/OffsetManager.scala
<https://reviews.apache.org/r/24676/#comment98194>

    Do you think it would be clearer to name this onAppend or something similar?



core/src/main/scala/kafka/server/ReplicaManager.scala
<https://reviews.apache.org/r/24676/#comment97799>

    Should we rename ReplicaManager to ReplicatedLogManager?



core/src/main/scala/kafka/server/ReplicaManager.scala
<https://reviews.apache.org/r/24676/#comment98372>

    (for regular consumer fetch)



core/src/main/scala/kafka/server/ReplicaManager.scala
<https://reviews.apache.org/r/24676/#comment98380>

    This is old code and we don't need to address it in this patch, but I was 
wondering if it makes sense to respond sooner if there is at least one error in 
the local append. What do you think? i.e., I don't remember a good reason for 
holding on to the request if there are i < numPartitions errors in local append.



core/src/main/scala/kafka/utils/DelayedItem.scala
<https://reviews.apache.org/r/24676/#comment98553>

    We don't really need this class anymore and it can be folded into 
DelayedRequest right?



core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
<https://reviews.apache.org/r/24676/#comment98554>

    weird comment


- Joel Koshy


On Oct. 17, 2014, 4:56 p.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/24676/
> -----------------------------------------------------------
> 
> (Updated Oct. 17, 2014, 4:56 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1583
>     https://issues.apache.org/jira/browse/KAFKA-1583
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Incorporate Jun's comments round two after rebase
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/api/FetchRequest.scala 
> 59c09155dd25fad7bed07d3d00039e3dc66db95c 
>   core/src/main/scala/kafka/api/FetchResponse.scala 
> 8d085a1f18f803b3cebae4739ad8f58f95a6c600 
>   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
> 861a6cf11dc6b6431fcbbe9de00c74a122f204bd 
>   core/src/main/scala/kafka/api/ProducerRequest.scala 
> b2366e7eedcac17f657271d5293ff0bef6f3cbe6 
>   core/src/main/scala/kafka/api/ProducerResponse.scala 
> a286272c834b6f40164999ff8b7f8998875f2cfe 
>   core/src/main/scala/kafka/cluster/Partition.scala 
> e88ecf224a4dab8bbd26ba7b0c3ccfe844c6b7f4 
>   core/src/main/scala/kafka/common/ErrorMapping.scala 
> 880ab4a004f078e5d84446ea6e4454ecc06c95f2 
>   core/src/main/scala/kafka/log/Log.scala 
> 157d67369baabd2206a2356b2aa421e848adab17 
>   core/src/main/scala/kafka/network/BoundedByteBufferSend.scala 
> a624359fb2059340bb8dc1619c5b5f226e26eb9b 
>   core/src/main/scala/kafka/server/DelayedFetch.scala 
> e0f14e25af03e6d4344386dcabc1457ee784d345 
>   core/src/main/scala/kafka/server/DelayedProduce.scala 
> 9481508fc2d6140b36829840c337e557f3d090da 
>   core/src/main/scala/kafka/server/FetchRequestPurgatory.scala 
> ed1318891253556cdf4d908033b704495acd5724 
>   core/src/main/scala/kafka/server/KafkaApis.scala 
> 85498b4a1368d3506f19c4cfc64934e4d0ac4c90 
>   core/src/main/scala/kafka/server/OffsetManager.scala 
> 43eb2a35bb54d32c66cdb94772df657b3a104d1a 
>   core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala 
> d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 
> 78b7514cc109547c562e635824684fad581af653 
>   core/src/main/scala/kafka/server/RequestPurgatory.scala 
> 9d76234bc2c810ec08621dc92bb4061b8e7cd993 
>   core/src/main/scala/kafka/utils/DelayedItem.scala 
> d7276494072f14f1cdf7d23f755ac32678c5675c 
>   core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 
> 209a409cb47eb24f83cee79f4e064dbc5f5e9d62 
>   core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala 
> fb61d552f2320fedec547400fbbe402a0b2f5d87 
>   core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 
> 03a424d45215e1e7780567d9559dae4d0ae6fc29 
>   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 
> cd302aa51eb8377d88b752d48274e403926439f2 
>   core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
> a9c4ddc78df0b3695a77a12cf8cf25521a203122 
>   core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 
> a577f4a8bf420a5bc1e62fad6d507a240a42bcaa 
>   core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 
> 3804a114e97c849cae48308997037786614173fc 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 
> 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 
> 
> Diff: https://reviews.apache.org/r/24676/diff/
> 
> 
> Testing
> -------
> 
> Unit tests
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>

Reply via email to