> On March 10, 2014, 5:23 p.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/common/record/Compressor.java, lines 
> > 100-101
> > <https://reviews.apache.org/r/18299/diff/7/?file=514016#file514016line100>
> >
> >     Would it be better to write the code as:
> >     
> >      catch (Exception e) {
> >       if (e instanceof IOException || e instanceof IOException) {
> >         // do sth
> >       }
> >     
> >     This avoids duplicating the code and we can just say I/O exception in 
> > the log.
I would personally prefer the current code layout since 1) IOException and 
BufferOverflowException are actually for different cases, the first one is not 
expected while the other is; we handle IOException here only to not expose them 
in MemoryRecords; 2) If we catch Exception, then for "else" we'd better 
re-throw them, which will make the function having to be declared as throws 
Exceptions.


> On March 10, 2014, 5:23 p.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/common/record/Compressor.java, lines 
> > 111-112
> > <https://reviews.apache.org/r/18299/diff/7/?file=514016#file514016line111>
> >
> >     Do we need the input to be final?

Not necessarily, I put them here just to make sure they are not altered at any 
cases. Do you think there is a reason we should better remove them?


> On March 10, 2014, 5:23 p.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java, 
> > lines 50-53
> > <https://reviews.apache.org/r/18299/diff/7/?file=514017#file514017line50>
> >
> >     If we have a separate init(), every caller has to remember calling this 
> > after the construction of MemoryRecord. Is this worth doing lazy init for 
> > compressor? If it's really worth doing, perhaps we can do the lazy init in 
> > each of the func (e.g. append) that needs the compressor.

I think it is necessary to do lazy init, but just efficient. Since while doing 
the init for compression the header will be re-written, which we do not want 
while reading a memory records. I can move the init() to Compressor and check 
that every time though.


> On March 10, 2014, 5:23 p.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java, 
> > lines 178-179
> > <https://reviews.apache.org/r/18299/diff/7/?file=514017#file514017line178>
> >
> >     Do we need to wrap it or just let it pass through?

If this is the only place that could throw BufferUnderflowException in 
consumers/mirrormakers, then probably we do not need to wrap it; otherwise I 
would prefer to indicate where this exception is thrown. How about keeping it 
here for now and remove it later when we realize that this is really not 
necessary?


> On March 10, 2014, 5:23 p.m., Jun Rao wrote:
> > core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala, 
> > lines 145-146
> > <https://reviews.apache.org/r/18299/diff/7/?file=514024#file514024line145>
> >
> >     Do we actually hit the Overflow exception with this partition size?

I have confirmed this in my tests with trace logging.


> On March 10, 2014, 5:23 p.m., Jun Rao wrote:
> > core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala, 
> > lines 119-121
> > <https://reviews.apache.org/r/18299/diff/7/?file=514024#file514024line119>
> >
> >     Could we also verify the content of the fetched messages?

Ack.


> On March 10, 2014, 5:23 p.m., Jun Rao wrote:
> > core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala, 
> > lines 170-171
> > <https://reviews.apache.org/r/18299/diff/7/?file=514024#file514024line170>
> >
> >     Could we also verify the content of the fetched messages?

Ack.


> On March 10, 2014, 5:23 p.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/common/record/Record.java, lines 
> > 91-94
> > <https://reviews.apache.org/r/18299/diff/7/?file=514018#file514018line91>
> >
> >     It's still weird to always write the data as uncompressed. In 
> > RecordTest, the compressed type is not really being exercised since the 
> > record is created as uncompressed. Perhaps we should just change the unit 
> > test. If the compression type is not none, we will have to decompress the 
> > value to get the uncompressed content.

Added the comments for clearer explanation.


- Guozhang


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/18299/#review36658
-----------------------------------------------------------


On March 8, 2014, 12:36 a.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/18299/
> -----------------------------------------------------------
> 
> (Updated March 8, 2014, 12:36 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1253
>     https://issues.apache.org/jira/browse/KAFKA-1253
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Incorporate Jun's comments round two
> 
> Refactored compression logic into Compressor
> 
> GZIP/Snappy integration tests
> 
> 
> Diffs
> -----
> 
>   build.gradle 84fa0d6b5f7405af755c5d7ff7bdd7592bb8668f 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
> 1ac69436f117800815b8d50f042e9e2a29364b43 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
> 32e12ad149f6d70c96a498d0a390976f77bf9e2a 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
>  616e1006f21c54af8260e84a96928cb8893ceb7c 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
>  038a05a94b795ec0a95b2d40a89222394b5a74c4 
>   
> clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/record/CompressionType.java 
> 906da02d02c03aadd8ab73ed2fc9a1898acb8d72 
>   clients/src/main/java/org/apache/kafka/common/record/Compressor.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java 
> 9d8935fa3beeb2a78b109a41ed76fd4374239560 
>   clients/src/main/java/org/apache/kafka/common/record/Record.java 
> f1dc9778502cbdfe982254fb6e25947842622239 
>   clients/src/main/java/org/apache/kafka/common/utils/Utils.java 
> 0c6b3656375721a718fb4de10118170aacce0ea9 
>   clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java 
> b0745b528cef929c4273f7e2ac4de1476cfc25ad 
>   clients/src/test/java/org/apache/kafka/common/record/RecordTest.java 
> ae54d67da9907b0a043180c7395a1370b3d0528d 
>   clients/src/test/java/org/apache/kafka/common/utils/CrcTest.java 
> PRE-CREATION 
>   clients/src/test/java/org/apache/kafka/test/TestUtils.java 
> 36cfc0fda742eb501af2c2c0330e3f461cf1f40c 
>   core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala 
> PRE-CREATION 
>   core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 
> c002f5ea38ece66ad559fadb18ffaf40ac2026aa 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 
> 66ea76b9b6c0f8839f715c845fb9b9671b8f35c1 
> 
> Diff: https://reviews.apache.org/r/18299/diff/
> 
> 
> Testing
> -------
> 
> integration tests
> 
> snappy dynamic load test
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>

Reply via email to