> On March 19, 2014, 3:17 a.m., Jay Kreps wrote:
> > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java,
> >  line 58
> > <https://reviews.apache.org/r/18299/diff/14/?file=526582#file526582line58>
> >
> >     What is the reason for this check?

There is a corner case that the heuristics used to estimate the written bytes 
would return a value larger than the capacity already if we use the 
compressor's buffer size. For now since I removed this heuristics this may not 
hit, but I think it is better leaving it here in case we change to some other 
heuristics in the future. 


> On March 19, 2014, 3:17 a.m., Jay Kreps wrote:
> > clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java, 
> > line 69
> > <https://reviews.apache.org/r/18299/diff/14/?file=526589#file526589line69>
> >
> >     Do we actually need this method? This is yet another place we define 
> > the message format.

This function is used in some unit tests. Besides I think this does not 
actually access the message format, but just the logentry format.


> On March 19, 2014, 3:17 a.m., Jay Kreps wrote:
> > clients/src/main/java/org/apache/kafka/common/record/Compressor.java, line 
> > 71
> > <https://reviews.apache.org/r/18299/diff/14/?file=526588#file526588line71>
> >
> >     Why isn't this done in MemoryRecords? This means we are defining the 
> > format in multiple places.

As I understands it, the MemoryRecords does not actually know the format of the 
message, all it knows is the format of the LogEntry, which is

[offset, message-size, message]

And inside message, the format is completely abstract from MemoryRecords but 
handles in the Compressor/Record:

[crc, magic, attribute, key-size, key, value-size, value]

When compression is used, we need to make the shallow message out of the 
compressed messages, and I think this logic fits naturally in Compressor.


> On March 19, 2014, 3:17 a.m., Jay Kreps wrote:
> > clients/src/main/java/org/apache/kafka/common/record/Compressor.java, line 
> > 103
> > <https://reviews.apache.org/r/18299/diff/14/?file=526588#file526588line103>
> >
> >     These should not be swallowed.

The only reason I swallow them here is that IOExceptions should theoretically 
never be thrown, since the underlying stream used a byte buffer, which does not 
do any IO and hence not throwing IOException. But the signature of the function 
does declare throws IOException, that is why I swallow them here. I could also 
wrap it with the runtime KafkaException. Let me know which one do you prefer.


> On March 19, 2014, 3:17 a.m., Jay Kreps wrote:
> > clients/src/main/java/org/apache/kafka/common/record/CompressionFactory.java,
> >  line 31
> > <https://reviews.apache.org/r/18299/diff/14/?file=526586#file526586line31>
> >
> >     I strongly recommend you inline all of this into Compressor. This isn't 
> > really a factory for Compressions it is an OutputStreamFactory but 
> > splitting it into another class doesn't seem to buy us much.

I did this trying to isolate the Compressor from the specific compression 
types. I can change it back by merging Compressor with CompressionFactory.


> On March 19, 2014, 3:17 a.m., Jay Kreps wrote:
> > clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java, 
> > line 62
> > <https://reviews.apache.org/r/18299/diff/14/?file=526589#file526589line62>
> >
> >     Consider replacing this with a factory method
> >       MemoryRecords.emptyRecords(int size)
> >     
> >     The problem with boolean flags in java is that there is no named 
> > parameters so it is really hard to remember what the parameter is when you 
> > see
> >       new MemoryRecords(buffer, type, true)
> >

Great point.


> On March 19, 2014, 3:17 a.m., Jay Kreps wrote:
> > clients/src/main/java/org/apache/kafka/common/record/CompressionFactory.java,
> >  line 88
> > <https://reviews.apache.org/r/18299/diff/14/?file=526586#file526586line88>
> >
> >     ConcurrentModificationException. Make this an array of floats.

Using a list instead.


> On March 19, 2014, 3:17 a.m., Jay Kreps wrote:
> > clients/src/main/java/org/apache/kafka/common/record/CompressionFactory.java,
> >  line 39
> > <https://reviews.apache.org/r/18299/diff/14/?file=526586#file526586line39>
> >
> >     I am concerned about the use of DataOutputStream as it adds another 
> > layer of synchronization.

OK, removing the DataOutputStream.


- Guozhang


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/18299/#review37679
-----------------------------------------------------------


On March 19, 2014, 12:11 a.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/18299/
> -----------------------------------------------------------
> 
> (Updated March 19, 2014, 12:11 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1253
>     https://issues.apache.org/jira/browse/KAFKA-1253
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> In-place compression with
> 
> 1) Dynamic reallocation in the underlying byte buffer
> 2) Written bytes estimate to reduce reallocation probabilities
> 3) Deallocation in buffer pool following the original capacity
> 
> 
> Diffs
> -----
> 
>   build.gradle 84fa0d6b5f7405af755c5d7ff7bdd7592bb8668f 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
> 1ac69436f117800815b8d50f042e9e2a29364b43 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
> 32e12ad149f6d70c96a498d0a390976f77bf9e2a 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
>  b69866a9fb9a8b4e1e78d304a20eda3cbf178c6f 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
>  673b2962771c28ceb3c7a6c0fd6f69521bd7ed16 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
>  038a05a94b795ec0a95b2d40a89222394b5a74c4 
>   
> clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java 
> 3ebbb804242be6a001b3bae6524afccc85a87602 
>   
> clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/record/CompressionFactory.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/record/CompressionType.java 
> 906da02d02c03aadd8ab73ed2fc9a1898acb8d72 
>   clients/src/main/java/org/apache/kafka/common/record/Compressor.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java 
> 9d8935fa3beeb2a78b109a41ed76fd4374239560 
>   clients/src/main/java/org/apache/kafka/common/record/Record.java 
> f1dc9778502cbdfe982254fb6e25947842622239 
>   clients/src/main/java/org/apache/kafka/common/utils/Utils.java 
> 0c6b3656375721a718fb4de10118170aacce0ea9 
>   clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java 
> b0745b528cef929c4273f7e2ac4de1476cfc25ad 
>   clients/src/test/java/org/apache/kafka/common/record/RecordTest.java 
> ae54d67da9907b0a043180c7395a1370b3d0528d 
>   clients/src/test/java/org/apache/kafka/common/utils/CrcTest.java 
> PRE-CREATION 
>   clients/src/test/java/org/apache/kafka/test/TestUtils.java 
> 36cfc0fda742eb501af2c2c0330e3f461cf1f40c 
>   core/src/main/scala/kafka/producer/ConsoleProducer.scala 
> dd39ff22c918fe5b05f04582b748e32349b2055f 
>   core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala 
> PRE-CREATION 
>   core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 
> c002f5ea38ece66ad559fadb18ffaf40ac2026aa 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 
> 66ea76b9b6c0f8839f715c845fb9b9671b8f35c1 
>   perf/src/main/scala/kafka/perf/ProducerPerformance.scala 
> 3df0d130308a35fca96184adc21eeee2eea1488f 
> 
> Diff: https://reviews.apache.org/r/18299/diff/
> 
> 
> Testing
> -------
> 
> integration tests
> 
> unit tests
> 
> stress tests (1K message size, 1M messages in producer performance with ack = 
> 1, linger time = 0ms/500ms, random bit/all-ones messages)
> 
> snappy dynamic load test
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>

Reply via email to