-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/18299/#review37679
-----------------------------------------------------------


High-level comment:
I'm not sure if I get the code organization strategy between MemoryRecords, 
Compressor, and CompressionFactory. I'm pretty sure CompressionFactory and 
Compressor should be combined. It needs to be the case that the byte format is 
defined in a single place, not split between two or three places. Can you think 
if there is a cleaner way to divide these up--maybe just combining 
Compressor/CompressionFactory and moving the close() logic in Compressor into 
MemoryRecords?


clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
<https://reviews.apache.org/r/18299/#comment69308>

    What is the reason for this check?



clients/src/main/java/org/apache/kafka/common/record/CompressionFactory.java
<https://reviews.apache.org/r/18299/#comment69313>

    I strongly recommend you inline all of this into Compressor. This isn't 
really a factory for Compressions it is an OutputStreamFactory but splitting it 
into another class doesn't seem to buy us much.



clients/src/main/java/org/apache/kafka/common/record/CompressionFactory.java
<https://reviews.apache.org/r/18299/#comment69319>

    This isn't much damping. .9?



clients/src/main/java/org/apache/kafka/common/record/CompressionFactory.java
<https://reviews.apache.org/r/18299/#comment69315>

    I am concerned about the use of DataOutputStream as it adds another layer 
of synchronization.



clients/src/main/java/org/apache/kafka/common/record/CompressionFactory.java
<https://reviews.apache.org/r/18299/#comment69314>

    I don't think this is needed. Inside an if statement you should be able to 
refer to a class. The class only loads if that branch is taken.



clients/src/main/java/org/apache/kafka/common/record/CompressionFactory.java
<https://reviews.apache.org/r/18299/#comment69317>

    ConcurrentModificationException. Make this an array of floats.



clients/src/main/java/org/apache/kafka/common/record/Compressor.java
<https://reviews.apache.org/r/18299/#comment69324>

    Why isn't this done in MemoryRecords? This means we are defining the format 
in multiple places.



clients/src/main/java/org/apache/kafka/common/record/Compressor.java
<https://reviews.apache.org/r/18299/#comment69318>

    These should not be swallowed.



clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
<https://reviews.apache.org/r/18299/#comment69320>

    Consider making this private.



clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
<https://reviews.apache.org/r/18299/#comment69323>

    You need to cap the compression block size to something reasonable. If I 
use 256K batches that doesn't mean I want you to allocate 512K of memory. There 
can't possibly be value in a block size of more than 1-4k, right?



clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
<https://reviews.apache.org/r/18299/#comment69321>

    Consider replacing this with a factory method
      MemoryRecords.emptyRecords(int size)
    
    The problem with boolean flags in java is that there is no named parameters 
so it is really hard to remember what the parameter is when you see
      new MemoryRecords(buffer, type, true)
    



clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
<https://reviews.apache.org/r/18299/#comment69325>

    Do we actually need this method? This is yet another place we define the 
message format.



clients/src/main/java/org/apache/kafka/common/record/Record.java
<https://reviews.apache.org/r/18299/#comment69322>

    Please no TODOs unless we are planning on doing them in the next few weeks, 
it makes the code messy. I don't think we are going to break the protocol, but 
if you think we should let's file a JIRA.



clients/src/main/java/org/apache/kafka/common/utils/Utils.java
<https://reviews.apache.org/r/18299/#comment69306>

    Let's move this into CRC.java too as a static method.



clients/src/main/java/org/apache/kafka/common/utils/Utils.java
<https://reviews.apache.org/r/18299/#comment69305>

    This should go in Crc.java as a non-static method, no?


- Jay Kreps


On March 19, 2014, 12:11 a.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/18299/
> -----------------------------------------------------------
> 
> (Updated March 19, 2014, 12:11 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1253
>     https://issues.apache.org/jira/browse/KAFKA-1253
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> In-place compression with
> 
> 1) Dynamic reallocation in the underlying byte buffer
> 2) Written bytes estimate to reduce reallocation probabilities
> 3) Deallocation in buffer pool following the original capacity
> 
> 
> Diffs
> -----
> 
>   build.gradle 84fa0d6b5f7405af755c5d7ff7bdd7592bb8668f 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
> 1ac69436f117800815b8d50f042e9e2a29364b43 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
> 32e12ad149f6d70c96a498d0a390976f77bf9e2a 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
>  b69866a9fb9a8b4e1e78d304a20eda3cbf178c6f 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
>  673b2962771c28ceb3c7a6c0fd6f69521bd7ed16 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
>  038a05a94b795ec0a95b2d40a89222394b5a74c4 
>   
> clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java 
> 3ebbb804242be6a001b3bae6524afccc85a87602 
>   
> clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/record/CompressionFactory.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/record/CompressionType.java 
> 906da02d02c03aadd8ab73ed2fc9a1898acb8d72 
>   clients/src/main/java/org/apache/kafka/common/record/Compressor.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java 
> 9d8935fa3beeb2a78b109a41ed76fd4374239560 
>   clients/src/main/java/org/apache/kafka/common/record/Record.java 
> f1dc9778502cbdfe982254fb6e25947842622239 
>   clients/src/main/java/org/apache/kafka/common/utils/Utils.java 
> 0c6b3656375721a718fb4de10118170aacce0ea9 
>   clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java 
> b0745b528cef929c4273f7e2ac4de1476cfc25ad 
>   clients/src/test/java/org/apache/kafka/common/record/RecordTest.java 
> ae54d67da9907b0a043180c7395a1370b3d0528d 
>   clients/src/test/java/org/apache/kafka/common/utils/CrcTest.java 
> PRE-CREATION 
>   clients/src/test/java/org/apache/kafka/test/TestUtils.java 
> 36cfc0fda742eb501af2c2c0330e3f461cf1f40c 
>   core/src/main/scala/kafka/producer/ConsoleProducer.scala 
> dd39ff22c918fe5b05f04582b748e32349b2055f 
>   core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala 
> PRE-CREATION 
>   core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 
> c002f5ea38ece66ad559fadb18ffaf40ac2026aa 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 
> 66ea76b9b6c0f8839f715c845fb9b9671b8f35c1 
>   perf/src/main/scala/kafka/perf/ProducerPerformance.scala 
> 3df0d130308a35fca96184adc21eeee2eea1488f 
> 
> Diff: https://reviews.apache.org/r/18299/diff/
> 
> 
> Testing
> -------
> 
> integration tests
> 
> unit tests
> 
> stress tests (1K message size, 1M messages in producer performance with ack = 
> 1, linger time = 0ms/500ms, random bit/all-ones messages)
> 
> snappy dynamic load test
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>

Reply via email to