> On March 7, 2014, 5:58 p.m., Jun Rao wrote:
> > build.gradle, lines 40-71
> > <https://reviews.apache.org/r/18299/diff/6/?file=513281#file513281line40>
> >
> >     This should be uncommented.

The un-commentating missed my commit last time, will fix.


> On March 7, 2014, 5:58 p.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/common/record/Compressor.java, lines 
> > 98-99
> > <https://reviews.apache.org/r/18299/diff/6/?file=513289#file513289line98>
> >
> >     BufferOverflowException is not an IOException. It's a RuntimeException. 
> > Is that handled properly?
> 
> Guozhang Wang wrote:
>     In my tests I observed that when writes out of the boundary IOException 
> can be thrown, but I did not observe BufferOverflowException thrown.

I re-test it with batch size = 1 and it does throw BufferOverflowException now. 
I think you are right.


> On March 7, 2014, 5:58 p.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/common/record/Compressor.java, lines 
> > 125-126
> > <https://reviews.apache.org/r/18299/diff/6/?file=513289#file513289line125>
> >
> >     Do we need to log the exception? Why is this not consistent with the 
> > logging in putInt()?

I tried to centralize the exception handling using callable parameters, but 
gave up eventually finding out they do not help much in reducing LOC but 
increased extra cost of creating a new callable object each time.


> On March 7, 2014, 5:58 p.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/common/record/Record.java, lines 
> > 87-91
> > <https://reviews.apache.org/r/18299/diff/6/?file=513291#file513291line87>
> >
> >     Since we always interpret the bytes as uncompressed, do we need to pass 
> > in the codec in the constructor?

You are right, like I said in comments Record construction should always write 
a record as un-compressed, i.e. a record can either be 1) un-compressed or 2) 
indicated compressed in the attribute and payload wrapping a batch of messages. 
But in some unit tests in core some times a Message with compression codec in 
attributes but the payload content are actually un-compressed is generated, and 
which would use this function. The new Record function migrated from the 
Message class in core and inherited this "property". I think we can just leave 
it here for compatibility.


- Guozhang


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/18299/#review36525
-----------------------------------------------------------


On March 7, 2014, 1:49 a.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/18299/
> -----------------------------------------------------------
> 
> (Updated March 7, 2014, 1:49 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1253
>     https://issues.apache.org/jira/browse/KAFKA-1253
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Refactored Compression logic into Compressor
> 
> GZIP/Snappy Integration Tests
> 
> 
> Diffs
> -----
> 
>   build.gradle 84fa0d6b5f7405af755c5d7ff7bdd7592bb8668f 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
> 1ac69436f117800815b8d50f042e9e2a29364b43 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
> f8748960fe9a02d52f7caae675abec503df91a39 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
>  616e1006f21c54af8260e84a96928cb8893ceb7c 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
>  038a05a94b795ec0a95b2d40a89222394b5a74c4 
>   
> clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/record/CompressionType.java 
> 906da02d02c03aadd8ab73ed2fc9a1898acb8d72 
>   clients/src/main/java/org/apache/kafka/common/record/Compressor.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java 
> 9d8935fa3beeb2a78b109a41ed76fd4374239560 
>   clients/src/main/java/org/apache/kafka/common/record/Record.java 
> f1dc9778502cbdfe982254fb6e25947842622239 
>   clients/src/main/java/org/apache/kafka/common/utils/Utils.java 
> 0c6b3656375721a718fb4de10118170aacce0ea9 
>   clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java 
> b0745b528cef929c4273f7e2ac4de1476cfc25ad 
>   clients/src/test/java/org/apache/kafka/common/record/RecordTest.java 
> ae54d67da9907b0a043180c7395a1370b3d0528d 
>   clients/src/test/java/org/apache/kafka/common/utils/CrcTest.java 
> PRE-CREATION 
>   clients/src/test/java/org/apache/kafka/test/TestUtils.java 
> 36cfc0fda742eb501af2c2c0330e3f461cf1f40c 
>   core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 
> c002f5ea38ece66ad559fadb18ffaf40ac2026aa 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 
> 66ea76b9b6c0f8839f715c845fb9b9671b8f35c1 
> 
> Diff: https://reviews.apache.org/r/18299/diff/
> 
> 
> Testing
> -------
> 
> integration tests
> 
> snappy dynamic load test
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>

Reply via email to