> On March 18, 2014, 7:07 p.m., Jun Rao wrote:
> > 1. There are two unit test failures.
> > 
> > RecordAccumulatorTest. testFull
> > RecordAccumulatorTest. testLinger
> > 
> > 2. The issue with compressed message size exceeding the request size is 
> > still not resolved.
> 
> Guozhang Wang wrote:
>     Hi Jun,
>     
>     This patch is not final yet, I am still doing some debugging for issues 
> found in the system test and perf tests. Could you take another look once I 
> uploaded the final version? This version is just for bookkeeping, sorry for 
> the confusion.

These two tests do not fail any more in my latest patch, maybe related to some 
bugs I found during system tests.

When a single batch is larger than the max request size, will drop it on the 
floor and log the error.


> On March 18, 2014, 7:07 p.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/common/record/Compressor.java, lines 
> > 169-170
> > <https://reviews.apache.org/r/18299/diff/13/?file=525728#file525728line169>
> >
> >     If we want to be a bit conservative, shouldn't we take the max of the 
> > two estimates?

According to the perf test, the min is already quite effective in reducing 
reallocation possibilities; on the other hand, more conservative means larger 
memory waste.


> On March 18, 2014, 7:07 p.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java, 
> > lines 78-79
> > <https://reviews.apache.org/r/18299/diff/13/?file=525729#file525729line78>
> >
> >     Why do we need to mark this position?

It is not ByteBuffer.mark, rather Compressor.mark() that tells the compressor 
that one record with the size has been written. I will change the name to avoid 
confusion.


> On March 18, 2014, 7:07 p.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java, 
> > lines 172-173
> > <https://reviews.apache.org/r/18299/diff/13/?file=525729#file525729line172>
> >
> >     This comment is still not clear to me.

Reworded again.


> On March 18, 2014, 7:07 p.m., Jun Rao wrote:
> > core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala, 
> > lines 40-43
> > <https://reviews.apache.org/r/18299/diff/13/?file=525737#file525737line40>
> >
> >     Not sure how this test works. We send messages randomly to one of the 
> > four partitions, yet we only read data from partition 0. Do we need to have 
> > 4 partitions in the topic? Do we need more than 1 broker?

When the topic is created, only one partition is used. That is why this test 
works, but I agree that we do not actually needs 4 partitions and 2 servers. 
Will change that.


- Guozhang


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/18299/#review37585
-----------------------------------------------------------


On March 17, 2014, 10:56 p.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/18299/
> -----------------------------------------------------------
> 
> (Updated March 17, 2014, 10:56 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1253
>     https://issues.apache.org/jira/browse/KAFKA-1253
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> In-place compression with
> 
> 1) Dynamic reallocation in the underlying byte buffer
> 2) Written bytes estimate to reduce reallocation probabilities
> 3) Deallocation in buffer pool following the original capacity
> 
> 
> Diffs
> -----
> 
>   build.gradle 84fa0d6b5f7405af755c5d7ff7bdd7592bb8668f 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
> 1ac69436f117800815b8d50f042e9e2a29364b43 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
> 32e12ad149f6d70c96a498d0a390976f77bf9e2a 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
>  5bed60730ea108684bea2440af5a008feb0ada61 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
>  7a03f389cc6a4d56152b882555d7498af9a04d20 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
>  038a05a94b795ec0a95b2d40a89222394b5a74c4 
>   
> clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java 
> 3ebbb804242be6a001b3bae6524afccc85a87602 
>   
> clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/record/CompressionFactory.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/record/CompressionType.java 
> 906da02d02c03aadd8ab73ed2fc9a1898acb8d72 
>   clients/src/main/java/org/apache/kafka/common/record/Compressor.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java 
> 9d8935fa3beeb2a78b109a41ed76fd4374239560 
>   clients/src/main/java/org/apache/kafka/common/record/Record.java 
> f1dc9778502cbdfe982254fb6e25947842622239 
>   clients/src/main/java/org/apache/kafka/common/utils/Utils.java 
> 0c6b3656375721a718fb4de10118170aacce0ea9 
>   clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java 
> b0745b528cef929c4273f7e2ac4de1476cfc25ad 
>   clients/src/test/java/org/apache/kafka/common/record/RecordTest.java 
> ae54d67da9907b0a043180c7395a1370b3d0528d 
>   clients/src/test/java/org/apache/kafka/common/utils/CrcTest.java 
> PRE-CREATION 
>   clients/src/test/java/org/apache/kafka/test/TestUtils.java 
> 36cfc0fda742eb501af2c2c0330e3f461cf1f40c 
>   core/src/main/scala/kafka/producer/ConsoleProducer.scala 
> dd39ff22c918fe5b05f04582b748e32349b2055f 
>   core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala 
> PRE-CREATION 
>   core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 
> c002f5ea38ece66ad559fadb18ffaf40ac2026aa 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 
> 66ea76b9b6c0f8839f715c845fb9b9671b8f35c1 
>   perf/src/main/scala/kafka/perf/ProducerPerformance.scala 
> f12a45becb11a8bed586024866235b268630fec6 
> 
> Diff: https://reviews.apache.org/r/18299/diff/
> 
> 
> Testing
> -------
> 
> integration tests
> 
> unit tests
> 
> stress tests (1K message size, 1M messages in producer performance with ack = 
> 1, linger time = 0ms/500ms, random bit/all-ones messages)
> 
> snappy dynamic load test
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>

Reply via email to