-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/18299/#review37311
-----------------------------------------------------------


1. Compilation error when running unit tests.
/Users/jrao/Intellij/kafka_gradle/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala:135:
 Missing closing brace `}' assumed here
object ProducerCompressionTest {
^
one error found
 FAILED

2. Thought about another corner case. Now that MemoryRecord can expand, it's 
possible for a compressed MemoryRecord to be larger that the max produce 
request size. When this happens, we will never be able to drain that 
MemoryRecord when calling RecordAccumulator.drain(). Not sure what's the best 
way to address this.


clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java
<https://reviews.apache.org/r/18299/#comment68771>

    Wouldn't capacity() << 1 be always >= than capacity() + 1?



clients/src/main/java/org/apache/kafka/common/record/CompressionFactory.java
<https://reviews.apache.org/r/18299/#comment68772>

    Shouldn't we pass in bufferSize instead of 512?



clients/src/main/java/org/apache/kafka/common/record/CompressionFactory.java
<https://reviews.apache.org/r/18299/#comment68773>

    Should we use double to get more precision?



clients/src/main/java/org/apache/kafka/common/record/Compressor.java
<https://reviews.apache.org/r/18299/#comment68823>

    We probably can call Compressor.putRecord() directly since when key and 
value are null, Record.write and Compressor.putRecord are doing the same thing. 
Currently, it's a bit weird that Compressor calls Record.write, which calls 
putRecord back in Compressor.



clients/src/main/java/org/apache/kafka/common/record/Compressor.java
<https://reviews.apache.org/r/18299/#comment68774>

    Should we avoid this calculation if compression type is None? Division on 
float could be a bit expensive.



clients/src/main/java/org/apache/kafka/common/record/Compressor.java
<https://reviews.apache.org/r/18299/#comment68817>

    Could we name this estimatedBytesWritten()?



clients/src/main/java/org/apache/kafka/common/record/Compressor.java
<https://reviews.apache.org/r/18299/#comment68818>

    The latter estimate seems more accurate. Could you explain why we need the 
first estimate?



clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
<https://reviews.apache.org/r/18299/#comment68781>

    It seems that in all callers, writable is always true. 



clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
<https://reviews.apache.org/r/18299/#comment68783>

    Is this comment still valid?



clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
<https://reviews.apache.org/r/18299/#comment68782>

    Could we call this isFull()?



clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
<https://reviews.apache.org/r/18299/#comment68821>

    Not sure that I understand the comment on depending on remaining.



clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
<https://reviews.apache.org/r/18299/#comment68820>

    rec.limit() is probably only needed in the uncompressed case.



clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
<https://reviews.apache.org/r/18299/#comment68822>

    has => have, new compressed => new uncompressed



clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
<https://reviews.apache.org/r/18299/#comment68824>

    Could we just merge these into a single parameterized unit test?



clients/src/test/java/org/apache/kafka/common/record/RecordTest.java
<https://reviews.apache.org/r/18299/#comment68825>

    Instead of storing keyArray, could we just use this.key.array()?



core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
<https://reviews.apache.org/r/18299/#comment68826>

    Could we just use a single consumer?



core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
<https://reviews.apache.org/r/18299/#comment68827>

    Instead of duplicating the input messages, perhaps we could store them in 
an array initially?


- Jun Rao


On March 15, 2014, 12:40 a.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/18299/
> -----------------------------------------------------------
> 
> (Updated March 15, 2014, 12:40 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1253
>     https://issues.apache.org/jira/browse/KAFKA-1253
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> In-place compression with
> 
> 1) Dynamic reallocation in the underlying byte buffer
> 2) Written bytes estimate to reduce reallocation probabilities
> 3) Deallocation in buffer pool following the original capacity
> 
> 
> Diffs
> -----
> 
>   build.gradle 84fa0d6b5f7405af755c5d7ff7bdd7592bb8668f 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
> 1ac69436f117800815b8d50f042e9e2a29364b43 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
> 32e12ad149f6d70c96a498d0a390976f77bf9e2a 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
>  5bed60730ea108684bea2440af5a008feb0ada61 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
>  7a03f389cc6a4d56152b882555d7498af9a04d20 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
>  038a05a94b795ec0a95b2d40a89222394b5a74c4 
>   
> clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java 
> 3ebbb804242be6a001b3bae6524afccc85a87602 
>   
> clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/record/CompressionFactory.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/record/CompressionType.java 
> 906da02d02c03aadd8ab73ed2fc9a1898acb8d72 
>   clients/src/main/java/org/apache/kafka/common/record/Compressor.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java 
> 9d8935fa3beeb2a78b109a41ed76fd4374239560 
>   clients/src/main/java/org/apache/kafka/common/record/Record.java 
> f1dc9778502cbdfe982254fb6e25947842622239 
>   clients/src/main/java/org/apache/kafka/common/utils/Utils.java 
> 0c6b3656375721a718fb4de10118170aacce0ea9 
>   clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java 
> b0745b528cef929c4273f7e2ac4de1476cfc25ad 
>   clients/src/test/java/org/apache/kafka/common/record/RecordTest.java 
> ae54d67da9907b0a043180c7395a1370b3d0528d 
>   clients/src/test/java/org/apache/kafka/common/utils/CrcTest.java 
> PRE-CREATION 
>   clients/src/test/java/org/apache/kafka/test/TestUtils.java 
> 36cfc0fda742eb501af2c2c0330e3f461cf1f40c 
>   core/src/main/scala/kafka/producer/ConsoleProducer.scala 
> dd39ff22c918fe5b05f04582b748e32349b2055f 
>   core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala 
> PRE-CREATION 
>   core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 
> c002f5ea38ece66ad559fadb18ffaf40ac2026aa 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 
> 66ea76b9b6c0f8839f715c845fb9b9671b8f35c1 
>   perf/src/main/scala/kafka/perf/ProducerPerformance.scala 
> f12a45becb11a8bed586024866235b268630fec6 
> 
> Diff: https://reviews.apache.org/r/18299/diff/
> 
> 
> Testing
> -------
> 
> integration tests
> 
> unit tests
> 
> stress tests (1K message size, 1M messages in producer performance with ack = 
> 1, linger time = 0ms/500ms, random bit/all-ones messages)
> 
> snappy dynamic load test
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>

Reply via email to