-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/18102/#review35370
-----------------------------------------------------------



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/18102/#comment65847>

    Why are we using instanceof instead of catching the individual exception 
types?
    
    Also why are we wrapping RuntimeExceptions in KafkaException?
    
    Basically I don't think I follow the rationale here.



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/18102/#comment65848>

    You seem to have removed the close on the accumulator is this an accident?



clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
<https://reviews.apache.org/r/18102/#comment65849>

    Hmm so we need to make a new atomic reference now? This seems much much 
worse then just recreating the RecordMetadata instance, no?



clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
<https://reviews.apache.org/r/18102/#comment65850>

    I'm not sold that this refactor actually made things better. Now everything 
goes through future.get which is needlessly synchronized, right?



clients/src/main/java/org/apache/kafka/common/errors/FatalException.java
<https://reviews.apache.org/r/18102/#comment65851>

    The current logic is that all exceptions that don't implement 
RetriableException are fatal so I don't think we need this. Originally I had 
thought to use RetriableException to mark an idempotent exception but the 
current usage is actually for any non-fatal exception. The rationale was (1) We 
shouldn't end up accidentally retrying if we through an unexpected error (out 
of memory or null pointer or something) and (2) when we have the idempotent 
producer retry will always be idempotent so modeling that in the exception 
hiearchy is premature.



clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
<https://reviews.apache.org/r/18102/#comment65852>

    How come you are removing this? Is it not an exception that can occur on 
the server?



core/src/main/scala/kafka/server/KafkaApis.scala
<https://reviews.apache.org/r/18102/#comment65854>

    Not sure I grok the changes here, may need to chat about it...



core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
<https://reviews.apache.org/r/18102/#comment65855>

    This package needs to match the directory. Same with the other integration 
test. You have it in kafka.api.* but declare kafka.test.*



core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
<https://reviews.apache.org/r/18102/#comment65862>

    Does it make sense to combine these all? wouldn't it be easier to just have 
4 test cases?
    
    That is usually easier to reason about and understand when they fail.



core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
<https://reviews.apache.org/r/18102/#comment65856>

    Can you make a method for this. It can be a local method for now.
      def makeProducer(brokers, acks, fetchTimeout) or whatever...



core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
<https://reviews.apache.org/r/18102/#comment65858>

    There is no need to catch the exception and fail, an uncaught exception 
will always fail the test.



core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
<https://reviews.apache.org/r/18102/#comment65859>

    If you expect a particular exception use the scalatest utility intercept. 
It looks like this:
    
    intercept[ExecutionException]{
      // do something that should throw that exception
    }
    
    Much easier to read.



core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
<https://reviews.apache.org/r/18102/#comment65860>

    Ditto



core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
<https://reviews.apache.org/r/18102/#comment65861>

    Ditto



core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
<https://reviews.apache.org/r/18102/#comment65857>

    It might be simpler to do something like
    if(producer1 != null) producer1.close()
    in the tearDown() method of the test to avoid the nested try/catch.



core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
<https://reviews.apache.org/r/18102/#comment65863>

    Would it make sense to break these into separate test methods?


- Jay Kreps


On Feb. 20, 2014, 11:45 p.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/18102/
> -----------------------------------------------------------
> 
> (Updated Feb. 20, 2014, 11:45 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1260
>     https://issues.apache.org/jira/browse/KAFKA-1260
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Rebased on producer re-try.
> 
> Incorporated Jay's fix for KAFKA-1266
> 
> Fixed some minor issues in producer:
> 
> 1. Close accumulator on shutdown
> 
> 2. Return offset for ack=-1
> 
> 3. Categorize error handling in send call
> 
> Fixed the offset issue for ack=-1 on the server side, details in KAFKA-1255.
> 
> Four test cases included in Part II:
> 
> 1. testErrorInResponse.
> 
> 2. testNoResponse. 
> 
> 3. testSendException.
> 
> 4. testDeadBroker. Currently blocks..
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
> e4bc97279585818860487a39a93b6481742b91db 
>   clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java 
> 8c776980ef1f5167fb02dda36f6ad2385bd62bbd 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
>  22d4c79bc06fb77af30ab930855632c402c5a72e 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
>  62613a3e29a7e5ffb1cc56d267793fef72857fc6 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
>  eb16f6d236e07b16654623606294a051531b5f58 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
> e373265f19f6ec9d40b1813a1ab7e6b5b10e0acd 
>   clients/src/main/java/org/apache/kafka/common/errors/FatalException.java 
> PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/errors/RecordTooLargeException.java
>  737b7f07b16a0d6549e835c33c9dfad58008a590 
>   clients/src/main/java/org/apache/kafka/common/network/Selector.java 
> f1e474cd53011970c6bd3db6dfacd0f12ed9ce6b 
>   clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
> f88992a0cafd9639b4c5823aa80de1daf8b0eadd 
>   core/src/main/scala/kafka/api/ProducerResponse.scala 
> 06261b9136399e48a8cb75f1c88a4cbfd1217de4 
>   core/src/main/scala/kafka/server/KafkaApis.scala 
> ae2df2014a08aaa95b5eaa430684cfdb79d4f55e 
>   core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 
> PRE-CREATION 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 
> 34baa8c6c7a15bb4aa93c286604f0eb7b19cd58e 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 
> 1c7a450651978e121376c226987b0d835f395a2a 
> 
> Diff: https://reviews.apache.org/r/18102/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>

Reply via email to