-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/18299/#review35245
-----------------------------------------------------------


A couple of high level comments:

1. RecordBatch: If compression is enabled in the producer, each Records will be 
of CompressedMemoryRecord. The tricky thing will be in tryAppend(). In the 
corner case, it could happen that the compressed message is bigger than the 
uncompressed version. So, records.append() would fail even though 
records.hasRoom() returns true. When this happens, ByteBuffer.put() throws 
BufferOverflowException, which is RuntimeException. So, in this corner case, we 
need to catch this exception and treat it as if there is no more room.

2. RecordAccumulator.append(): There is another tricky case in calculating the 
size when creating a new RecordBatch. Again, in the corner case, it could 
happen that the compressed message is bigger than the uncompressed version. So, 
in the rare case, we may not be able to put in a single compressed record into 
a newly allocated RecordBatch. Not sure what's the best way to handle this. We 
could (1) just send this record as uncompressed (since compressing doesn't 
really help); or (2) try to keep allocating a bigger buffer until the record 
can be put in.




clients/src/main/java/org/apache/kafka/common/record/CompressedMemoryRecords.java
<https://reviews.apache.org/r/18299/#comment65699>

    unused import.



clients/src/main/java/org/apache/kafka/common/record/CompressedMemoryRecords.java
<https://reviews.apache.org/r/18299/#comment65698>

    This method should be private.



clients/src/main/java/org/apache/kafka/common/record/CompressedMemoryRecords.java
<https://reviews.apache.org/r/18299/#comment65704>

    This is actually not the right place to add the offset and the crc for the 
shallow iterator. This method is probably intended for writing the produce 
request to the socket without first serializing the request into a separate 
buffer. However, we don't do that now and this method is not currently being 
used. On the producer side, when a Records is ready to be sent, we get its 
underlying ByteBuffer and use it in the ProducerRequest, which then serializes 
the bytes into a send ByteBuffer.
    
    So, we need to add the offset/crc for the shallow message when a Records is 
ready to be sent. Potentially, we can add a close() method in Record and call 
it in RecordAccumlator.ready() when a Records is determined to be ready. The 
offset/crc for the shallow message can be added in close().



clients/src/main/java/org/apache/kafka/common/record/CompressedMemoryRecords.java
<https://reviews.apache.org/r/18299/#comment65705>

    Instead of copying the data to a new ByteBuffer, we can let Record reuse 
the portion of the original ByteBuffer passed into CompressedRecordsIterator.



clients/src/main/java/org/apache/kafka/common/record/Record.java
<https://reviews.apache.org/r/18299/#comment65702>

    This doesn't handle compressed codec.


- Jun Rao


On Feb. 22, 2014, 1:57 a.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/18299/
> -----------------------------------------------------------
> 
> (Updated Feb. 22, 2014, 1:57 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1253
>     https://issues.apache.org/jira/browse/KAFKA-1253
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> In-place compression with Crc32 pre-computation.
> 
> In-place decompression embedded in nested iterator.
> 
> 
> Diffs
> -----
> 
>   
> clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/record/CompressedMemoryRecords.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java 
> 9d8935fa3beeb2a78b109a41ed76fd4374239560 
>   clients/src/main/java/org/apache/kafka/common/record/Record.java 
> f1dc9778502cbdfe982254fb6e25947842622239 
>   clients/src/main/java/org/apache/kafka/common/utils/Utils.java 
> 9c34e7dc82f33df7406cad0e64eb6a896d068dc6 
>   clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java 
> b0745b528cef929c4273f7e2ac4de1476cfc25ad 
>   clients/src/test/java/org/apache/kafka/common/record/RecordTest.java 
> ae54d67da9907b0a043180c7395a1370b3d0528d 
>   clients/src/test/java/org/apache/kafka/common/utils/CrcTest.java 
> PRE-CREATION 
>   clients/src/test/java/org/apache/kafka/test/TestUtils.java 
> 36cfc0fda742eb501af2c2c0330e3f461cf1f40c 
> 
> Diff: https://reviews.apache.org/r/18299/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>

Reply via email to