----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/18299/#review36525 -----------------------------------------------------------
Does the unit test pass? It seems to hang on ProducerFailureHandlingTest. build.gradle <https://reviews.apache.org/r/18299/#comment67519> This should be uncommented. clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java <https://reviews.apache.org/r/18299/#comment67521> Why is this in error? clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java <https://reviews.apache.org/r/18299/#comment67524> Could this be part of the constructor of MemoryRecords? clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java <https://reviews.apache.org/r/18299/#comment67523> This probably should be warning. clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java <https://reviews.apache.org/r/18299/#comment67534> Could this be part of the constructor of MemoryRecords? clients/src/main/java/org/apache/kafka/common/record/Compressor.java <https://reviews.apache.org/r/18299/#comment67529> BufferOverflowException is not an IOException. It's a RuntimeException. Is that handled properly? clients/src/main/java/org/apache/kafka/common/record/Compressor.java <https://reviews.apache.org/r/18299/#comment67525> Since this is just a warning, we probably shouldn't include error in the message. clients/src/main/java/org/apache/kafka/common/record/Compressor.java <https://reviews.apache.org/r/18299/#comment67526> Ditto as the above. clients/src/main/java/org/apache/kafka/common/record/Compressor.java <https://reviews.apache.org/r/18299/#comment67527> Do we need to log the exception? Why is this not consistent with the logging in putInt()? clients/src/main/java/org/apache/kafka/common/record/Compressor.java <https://reviews.apache.org/r/18299/#comment67530> Ditto as the above. clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java <https://reviews.apache.org/r/18299/#comment67533> Could this be part of the constructor? clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java <https://reviews.apache.org/r/18299/#comment67535> Throwing exception is a bit expensive. We probably should avoid writing to the compressor after the first failure. clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java <https://reviews.apache.org/r/18299/#comment67537> This check seems unnecessary. If there are fewer bytes in the stream than size, a BufferUnderFlowException will be thrown. clients/src/main/java/org/apache/kafka/common/record/Record.java <https://reviews.apache.org/r/18299/#comment67541> Since we always interpret the bytes as uncompressed, do we need to pass in the codec in the constructor? core/src/test/scala/integration/kafka/api/ProducerSendTest.scala <https://reviews.apache.org/r/18299/#comment67544> I am wondering if it's better to break the test into two. In the first test, we just test the normal case with compression. We can probably just write one parameterized test (supported in junit) and bind it with gzip and snappy. In the second test, we test the overflow case. - Jun Rao On March 7, 2014, 1:49 a.m., Guozhang Wang wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/18299/ > ----------------------------------------------------------- > > (Updated March 7, 2014, 1:49 a.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1253 > https://issues.apache.org/jira/browse/KAFKA-1253 > > > Repository: kafka > > > Description > ------- > > Refactored Compression logic into Compressor > > GZIP/Snappy Integration Tests > > > Diffs > ----- > > build.gradle 84fa0d6b5f7405af755c5d7ff7bdd7592bb8668f > clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java > 1ac69436f117800815b8d50f042e9e2a29364b43 > clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java > f8748960fe9a02d52f7caae675abec503df91a39 > > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java > 616e1006f21c54af8260e84a96928cb8893ceb7c > > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java > 038a05a94b795ec0a95b2d40a89222394b5a74c4 > > clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/common/record/CompressionType.java > 906da02d02c03aadd8ab73ed2fc9a1898acb8d72 > clients/src/main/java/org/apache/kafka/common/record/Compressor.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java > 9d8935fa3beeb2a78b109a41ed76fd4374239560 > clients/src/main/java/org/apache/kafka/common/record/Record.java > f1dc9778502cbdfe982254fb6e25947842622239 > clients/src/main/java/org/apache/kafka/common/utils/Utils.java > 0c6b3656375721a718fb4de10118170aacce0ea9 > clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java > b0745b528cef929c4273f7e2ac4de1476cfc25ad > clients/src/test/java/org/apache/kafka/common/record/RecordTest.java > ae54d67da9907b0a043180c7395a1370b3d0528d > clients/src/test/java/org/apache/kafka/common/utils/CrcTest.java > PRE-CREATION > clients/src/test/java/org/apache/kafka/test/TestUtils.java > 36cfc0fda742eb501af2c2c0330e3f461cf1f40c > core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala > c002f5ea38ece66ad559fadb18ffaf40ac2026aa > core/src/test/scala/integration/kafka/api/ProducerSendTest.scala > 66ea76b9b6c0f8839f715c845fb9b9671b8f35c1 > > Diff: https://reviews.apache.org/r/18299/diff/ > > > Testing > ------- > > integration tests > > snappy dynamic load test > > > Thanks, > > Guozhang Wang > >