----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/18299/#review37679 -----------------------------------------------------------
High-level comment: I'm not sure if I get the code organization strategy between MemoryRecords, Compressor, and CompressionFactory. I'm pretty sure CompressionFactory and Compressor should be combined. It needs to be the case that the byte format is defined in a single place, not split between two or three places. Can you think if there is a cleaner way to divide these up--maybe just combining Compressor/CompressionFactory and moving the close() logic in Compressor into MemoryRecords? clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java <https://reviews.apache.org/r/18299/#comment69308> What is the reason for this check? clients/src/main/java/org/apache/kafka/common/record/CompressionFactory.java <https://reviews.apache.org/r/18299/#comment69313> I strongly recommend you inline all of this into Compressor. This isn't really a factory for Compressions it is an OutputStreamFactory but splitting it into another class doesn't seem to buy us much. clients/src/main/java/org/apache/kafka/common/record/CompressionFactory.java <https://reviews.apache.org/r/18299/#comment69319> This isn't much damping. .9? clients/src/main/java/org/apache/kafka/common/record/CompressionFactory.java <https://reviews.apache.org/r/18299/#comment69315> I am concerned about the use of DataOutputStream as it adds another layer of synchronization. clients/src/main/java/org/apache/kafka/common/record/CompressionFactory.java <https://reviews.apache.org/r/18299/#comment69314> I don't think this is needed. Inside an if statement you should be able to refer to a class. The class only loads if that branch is taken. clients/src/main/java/org/apache/kafka/common/record/CompressionFactory.java <https://reviews.apache.org/r/18299/#comment69317> ConcurrentModificationException. Make this an array of floats. clients/src/main/java/org/apache/kafka/common/record/Compressor.java <https://reviews.apache.org/r/18299/#comment69324> Why isn't this done in MemoryRecords? This means we are defining the format in multiple places. clients/src/main/java/org/apache/kafka/common/record/Compressor.java <https://reviews.apache.org/r/18299/#comment69318> These should not be swallowed. clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java <https://reviews.apache.org/r/18299/#comment69320> Consider making this private. clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java <https://reviews.apache.org/r/18299/#comment69323> You need to cap the compression block size to something reasonable. If I use 256K batches that doesn't mean I want you to allocate 512K of memory. There can't possibly be value in a block size of more than 1-4k, right? clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java <https://reviews.apache.org/r/18299/#comment69321> Consider replacing this with a factory method MemoryRecords.emptyRecords(int size) The problem with boolean flags in java is that there is no named parameters so it is really hard to remember what the parameter is when you see new MemoryRecords(buffer, type, true) clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java <https://reviews.apache.org/r/18299/#comment69325> Do we actually need this method? This is yet another place we define the message format. clients/src/main/java/org/apache/kafka/common/record/Record.java <https://reviews.apache.org/r/18299/#comment69322> Please no TODOs unless we are planning on doing them in the next few weeks, it makes the code messy. I don't think we are going to break the protocol, but if you think we should let's file a JIRA. clients/src/main/java/org/apache/kafka/common/utils/Utils.java <https://reviews.apache.org/r/18299/#comment69306> Let's move this into CRC.java too as a static method. clients/src/main/java/org/apache/kafka/common/utils/Utils.java <https://reviews.apache.org/r/18299/#comment69305> This should go in Crc.java as a non-static method, no? - Jay Kreps On March 19, 2014, 12:11 a.m., Guozhang Wang wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/18299/ > ----------------------------------------------------------- > > (Updated March 19, 2014, 12:11 a.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1253 > https://issues.apache.org/jira/browse/KAFKA-1253 > > > Repository: kafka > > > Description > ------- > > In-place compression with > > 1) Dynamic reallocation in the underlying byte buffer > 2) Written bytes estimate to reduce reallocation probabilities > 3) Deallocation in buffer pool following the original capacity > > > Diffs > ----- > > build.gradle 84fa0d6b5f7405af755c5d7ff7bdd7592bb8668f > clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java > 1ac69436f117800815b8d50f042e9e2a29364b43 > clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java > 32e12ad149f6d70c96a498d0a390976f77bf9e2a > > clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java > b69866a9fb9a8b4e1e78d304a20eda3cbf178c6f > > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java > 673b2962771c28ceb3c7a6c0fd6f69521bd7ed16 > > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java > 038a05a94b795ec0a95b2d40a89222394b5a74c4 > > clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java > 3ebbb804242be6a001b3bae6524afccc85a87602 > > clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/record/CompressionFactory.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/common/record/CompressionType.java > 906da02d02c03aadd8ab73ed2fc9a1898acb8d72 > clients/src/main/java/org/apache/kafka/common/record/Compressor.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java > 9d8935fa3beeb2a78b109a41ed76fd4374239560 > clients/src/main/java/org/apache/kafka/common/record/Record.java > f1dc9778502cbdfe982254fb6e25947842622239 > clients/src/main/java/org/apache/kafka/common/utils/Utils.java > 0c6b3656375721a718fb4de10118170aacce0ea9 > clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java > b0745b528cef929c4273f7e2ac4de1476cfc25ad > clients/src/test/java/org/apache/kafka/common/record/RecordTest.java > ae54d67da9907b0a043180c7395a1370b3d0528d > clients/src/test/java/org/apache/kafka/common/utils/CrcTest.java > PRE-CREATION > clients/src/test/java/org/apache/kafka/test/TestUtils.java > 36cfc0fda742eb501af2c2c0330e3f461cf1f40c > core/src/main/scala/kafka/producer/ConsoleProducer.scala > dd39ff22c918fe5b05f04582b748e32349b2055f > core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala > PRE-CREATION > core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala > c002f5ea38ece66ad559fadb18ffaf40ac2026aa > core/src/test/scala/integration/kafka/api/ProducerSendTest.scala > 66ea76b9b6c0f8839f715c845fb9b9671b8f35c1 > perf/src/main/scala/kafka/perf/ProducerPerformance.scala > 3df0d130308a35fca96184adc21eeee2eea1488f > > Diff: https://reviews.apache.org/r/18299/diff/ > > > Testing > ------- > > integration tests > > unit tests > > stress tests (1K message size, 1M messages in producer performance with ack = > 1, linger time = 0ms/500ms, random bit/all-ones messages) > > snappy dynamic load test > > > Thanks, > > Guozhang Wang > >