-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/18299/#review36658
-----------------------------------------------------------


Could we add the new compression property to existing tools like 
consoleProducer and ProducerPerformance that support compressed input?


clients/src/main/java/org/apache/kafka/common/record/Compressor.java
<https://reviews.apache.org/r/18299/#comment67707>

    Could the init be part of the constructor?



clients/src/main/java/org/apache/kafka/common/record/Compressor.java
<https://reviews.apache.org/r/18299/#comment67691>

    Would it be better to write the code as:
    
     catch (Exception e) {
      if (e instanceof IOException || e instanceof IOException) {
        // do sth
      }
    
    This avoids duplicating the code and we can just say I/O exception in the 
log.



clients/src/main/java/org/apache/kafka/common/record/Compressor.java
<https://reviews.apache.org/r/18299/#comment67692>

    Do we need the input to be final?



clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
<https://reviews.apache.org/r/18299/#comment67703>

    If we have a separate init(), every caller has to remember calling this 
after the construction of MemoryRecord. Is this worth doing lazy init for 
compressor? If it's really worth doing, perhaps we can do the lazy init in each 
of the func (e.g. append) that needs the compressor.



clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
<https://reviews.apache.org/r/18299/#comment67696>

    Do we need to wrap it or just let it pass through?



clients/src/main/java/org/apache/kafka/common/record/Record.java
<https://reviews.apache.org/r/18299/#comment67708>

    It's still weird to always write the data as uncompressed. In RecordTest, 
the compressed type is not really being exercised since the record is created 
as uncompressed. Perhaps we should just change the unit test. If the 
compression type is not none, we will have to decompress the value to get the 
uncompressed content.



core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
<https://reviews.apache.org/r/18299/#comment67699>

    Could we also verify the content of the fetched messages?



core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
<https://reviews.apache.org/r/18299/#comment67701>

    Do we actually hit the Overflow exception with this partition size?



core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
<https://reviews.apache.org/r/18299/#comment67702>

    Could we also verify the content of the fetched messages?


- Jun Rao


On March 8, 2014, 12:36 a.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/18299/
> -----------------------------------------------------------
> 
> (Updated March 8, 2014, 12:36 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1253
>     https://issues.apache.org/jira/browse/KAFKA-1253
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Incorporate Jun's comments round two
> 
> Refactored compression logic into Compressor
> 
> GZIP/Snappy integration tests
> 
> 
> Diffs
> -----
> 
>   build.gradle 84fa0d6b5f7405af755c5d7ff7bdd7592bb8668f 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
> 1ac69436f117800815b8d50f042e9e2a29364b43 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
> 32e12ad149f6d70c96a498d0a390976f77bf9e2a 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
>  616e1006f21c54af8260e84a96928cb8893ceb7c 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
>  038a05a94b795ec0a95b2d40a89222394b5a74c4 
>   
> clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/record/CompressionType.java 
> 906da02d02c03aadd8ab73ed2fc9a1898acb8d72 
>   clients/src/main/java/org/apache/kafka/common/record/Compressor.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java 
> 9d8935fa3beeb2a78b109a41ed76fd4374239560 
>   clients/src/main/java/org/apache/kafka/common/record/Record.java 
> f1dc9778502cbdfe982254fb6e25947842622239 
>   clients/src/main/java/org/apache/kafka/common/utils/Utils.java 
> 0c6b3656375721a718fb4de10118170aacce0ea9 
>   clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java 
> b0745b528cef929c4273f7e2ac4de1476cfc25ad 
>   clients/src/test/java/org/apache/kafka/common/record/RecordTest.java 
> ae54d67da9907b0a043180c7395a1370b3d0528d 
>   clients/src/test/java/org/apache/kafka/common/utils/CrcTest.java 
> PRE-CREATION 
>   clients/src/test/java/org/apache/kafka/test/TestUtils.java 
> 36cfc0fda742eb501af2c2c0330e3f461cf1f40c 
>   core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala 
> PRE-CREATION 
>   core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 
> c002f5ea38ece66ad559fadb18ffaf40ac2026aa 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 
> 66ea76b9b6c0f8839f715c845fb9b9671b8f35c1 
> 
> Diff: https://reviews.apache.org/r/18299/diff/
> 
> 
> Testing
> -------
> 
> integration tests
> 
> snappy dynamic load test
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>

Reply via email to