> On March 24, 2014, 11:50 p.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java,
> >  lines 246-251
> > <https://reviews.apache.org/r/18299/diff/16/?file=533373#file533373line246>
> >
> >     We can't just drop those records on the floor since we need to inform 
> > the client about the failure. Perhaps we could just form a request with a 
> > single RecordBatch and send it. The request will fail and the client will 
> > be notified.

I agree, and in the long run we probably should back-off the batch size.

I think that before we eventually fix this issue, currently we would probably 
just log it when this issue happens and move on. Throwing an runtime exception 
is generally doing the same as it does not necessarily stop the producer, but 
throwing it has the side-effect of undefined behavior of this sender iteration. 
So I decided to just log it and let the producer proceed. Let me know if you 
have any preferences.


> On March 24, 2014, 11:50 p.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/common/record/Compressor.java, lines 
> > 187-188
> > <https://reviews.apache.org/r/18299/diff/16/?file=533379#file533379line187>
> >
> >     So, we are switching to a static estimated compression rate? Will that 
> > be effective in terms of reducing re-allocation rate?

We still update the compression rate in close().


- Guozhang


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/18299/#review38386
-----------------------------------------------------------


On March 23, 2014, 12:55 a.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/18299/
> -----------------------------------------------------------
> 
> (Updated March 23, 2014, 12:55 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1253
>     https://issues.apache.org/jira/browse/KAFKA-1253
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Incorporated Jay's comments
> 
> In-place compression with
> 
> 1) Dynamic reallocation in the underlying byte buffer
> 2) Written bytes estimate to reduce reallocation probabilities
> 3) Deallocation in buffer pool following the original capacity
> 
> 
> Diffs
> -----
> 
>   build.gradle 84fa0d6b5f7405af755c5d7ff7bdd7592bb8668f 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
> 1ac69436f117800815b8d50f042e9e2a29364b43 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
> 32e12ad149f6d70c96a498d0a390976f77bf9e2a 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
>  b69866a9fb9a8b4e1e78d304a20eda3cbf178c6f 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
>  673b2962771c28ceb3c7a6c0fd6f69521bd7ed16 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
>  038a05a94b795ec0a95b2d40a89222394b5a74c4 
>   
> clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java 
> 3ebbb804242be6a001b3bae6524afccc85a87602 
>   
> clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/record/CompressionType.java 
> 906da02d02c03aadd8ab73ed2fc9a1898acb8d72 
>   clients/src/main/java/org/apache/kafka/common/record/Compressor.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java 
> 9d8935fa3beeb2a78b109a41ed76fd4374239560 
>   clients/src/main/java/org/apache/kafka/common/record/Record.java 
> f1dc9778502cbdfe982254fb6e25947842622239 
>   clients/src/main/java/org/apache/kafka/common/utils/Crc32.java 
> 153c5a6d345293aa0ba2cf513373323a6e9f2467 
>   clients/src/main/java/org/apache/kafka/common/utils/Utils.java 
> 0c6b3656375721a718fb4de10118170aacce0ea9 
>   clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java 
> b0745b528cef929c4273f7e2ac4de1476cfc25ad 
>   clients/src/test/java/org/apache/kafka/common/record/RecordTest.java 
> ae54d67da9907b0a043180c7395a1370b3d0528d 
>   clients/src/test/java/org/apache/kafka/common/utils/CrcTest.java 
> PRE-CREATION 
>   clients/src/test/java/org/apache/kafka/test/TestUtils.java 
> 36cfc0fda742eb501af2c2c0330e3f461cf1f40c 
>   core/src/main/scala/kafka/producer/ConsoleProducer.scala 
> dd39ff22c918fe5b05f04582b748e32349b2055f 
>   core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala 
> PRE-CREATION 
>   core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 
> c002f5ea38ece66ad559fadb18ffaf40ac2026aa 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 
> 66ea76b9b6c0f8839f715c845fb9b9671b8f35c1 
>   perf/src/main/scala/kafka/perf/ProducerPerformance.scala 
> 3df0d130308a35fca96184adc21eeee2eea1488f 
> 
> Diff: https://reviews.apache.org/r/18299/diff/
> 
> 
> Testing
> -------
> 
> shallow cluster tests with rolling bounces
> 
> integration tests
> 
> unit tests
> 
> stress tests (1K message size, 1M messages in producer performance with ack = 
> 1, linger time = 0ms/500ms, random bit/all-ones messages)
> 
> snappy dynamic load test
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>

Reply via email to