> On March 19, 2014, 3:17 a.m., Jay Kreps wrote: > > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java, > > line 58 > > <https://reviews.apache.org/r/18299/diff/14/?file=526582#file526582line58> > > > > What is the reason for this check?
There is a corner case that the heuristics used to estimate the written bytes would return a value larger than the capacity already if we use the compressor's buffer size. For now since I removed this heuristics this may not hit, but I think it is better leaving it here in case we change to some other heuristics in the future. > On March 19, 2014, 3:17 a.m., Jay Kreps wrote: > > clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java, > > line 69 > > <https://reviews.apache.org/r/18299/diff/14/?file=526589#file526589line69> > > > > Do we actually need this method? This is yet another place we define > > the message format. This function is used in some unit tests. Besides I think this does not actually access the message format, but just the logentry format. > On March 19, 2014, 3:17 a.m., Jay Kreps wrote: > > clients/src/main/java/org/apache/kafka/common/record/Compressor.java, line > > 71 > > <https://reviews.apache.org/r/18299/diff/14/?file=526588#file526588line71> > > > > Why isn't this done in MemoryRecords? This means we are defining the > > format in multiple places. As I understands it, the MemoryRecords does not actually know the format of the message, all it knows is the format of the LogEntry, which is [offset, message-size, message] And inside message, the format is completely abstract from MemoryRecords but handles in the Compressor/Record: [crc, magic, attribute, key-size, key, value-size, value] When compression is used, we need to make the shallow message out of the compressed messages, and I think this logic fits naturally in Compressor. > On March 19, 2014, 3:17 a.m., Jay Kreps wrote: > > clients/src/main/java/org/apache/kafka/common/record/Compressor.java, line > > 103 > > <https://reviews.apache.org/r/18299/diff/14/?file=526588#file526588line103> > > > > These should not be swallowed. The only reason I swallow them here is that IOExceptions should theoretically never be thrown, since the underlying stream used a byte buffer, which does not do any IO and hence not throwing IOException. But the signature of the function does declare throws IOException, that is why I swallow them here. I could also wrap it with the runtime KafkaException. Let me know which one do you prefer. > On March 19, 2014, 3:17 a.m., Jay Kreps wrote: > > clients/src/main/java/org/apache/kafka/common/record/CompressionFactory.java, > > line 31 > > <https://reviews.apache.org/r/18299/diff/14/?file=526586#file526586line31> > > > > I strongly recommend you inline all of this into Compressor. This isn't > > really a factory for Compressions it is an OutputStreamFactory but > > splitting it into another class doesn't seem to buy us much. I did this trying to isolate the Compressor from the specific compression types. I can change it back by merging Compressor with CompressionFactory. > On March 19, 2014, 3:17 a.m., Jay Kreps wrote: > > clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java, > > line 62 > > <https://reviews.apache.org/r/18299/diff/14/?file=526589#file526589line62> > > > > Consider replacing this with a factory method > > MemoryRecords.emptyRecords(int size) > > > > The problem with boolean flags in java is that there is no named > > parameters so it is really hard to remember what the parameter is when you > > see > > new MemoryRecords(buffer, type, true) > > Great point. > On March 19, 2014, 3:17 a.m., Jay Kreps wrote: > > clients/src/main/java/org/apache/kafka/common/record/CompressionFactory.java, > > line 88 > > <https://reviews.apache.org/r/18299/diff/14/?file=526586#file526586line88> > > > > ConcurrentModificationException. Make this an array of floats. Using a list instead. > On March 19, 2014, 3:17 a.m., Jay Kreps wrote: > > clients/src/main/java/org/apache/kafka/common/record/CompressionFactory.java, > > line 39 > > <https://reviews.apache.org/r/18299/diff/14/?file=526586#file526586line39> > > > > I am concerned about the use of DataOutputStream as it adds another > > layer of synchronization. OK, removing the DataOutputStream. - Guozhang ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/18299/#review37679 ----------------------------------------------------------- On March 19, 2014, 12:11 a.m., Guozhang Wang wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/18299/ > ----------------------------------------------------------- > > (Updated March 19, 2014, 12:11 a.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1253 > https://issues.apache.org/jira/browse/KAFKA-1253 > > > Repository: kafka > > > Description > ------- > > In-place compression with > > 1) Dynamic reallocation in the underlying byte buffer > 2) Written bytes estimate to reduce reallocation probabilities > 3) Deallocation in buffer pool following the original capacity > > > Diffs > ----- > > build.gradle 84fa0d6b5f7405af755c5d7ff7bdd7592bb8668f > clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java > 1ac69436f117800815b8d50f042e9e2a29364b43 > clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java > 32e12ad149f6d70c96a498d0a390976f77bf9e2a > > clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java > b69866a9fb9a8b4e1e78d304a20eda3cbf178c6f > > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java > 673b2962771c28ceb3c7a6c0fd6f69521bd7ed16 > > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java > 038a05a94b795ec0a95b2d40a89222394b5a74c4 > > clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java > 3ebbb804242be6a001b3bae6524afccc85a87602 > > clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/record/CompressionFactory.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/common/record/CompressionType.java > 906da02d02c03aadd8ab73ed2fc9a1898acb8d72 > clients/src/main/java/org/apache/kafka/common/record/Compressor.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java > 9d8935fa3beeb2a78b109a41ed76fd4374239560 > clients/src/main/java/org/apache/kafka/common/record/Record.java > f1dc9778502cbdfe982254fb6e25947842622239 > clients/src/main/java/org/apache/kafka/common/utils/Utils.java > 0c6b3656375721a718fb4de10118170aacce0ea9 > clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java > b0745b528cef929c4273f7e2ac4de1476cfc25ad > clients/src/test/java/org/apache/kafka/common/record/RecordTest.java > ae54d67da9907b0a043180c7395a1370b3d0528d > clients/src/test/java/org/apache/kafka/common/utils/CrcTest.java > PRE-CREATION > clients/src/test/java/org/apache/kafka/test/TestUtils.java > 36cfc0fda742eb501af2c2c0330e3f461cf1f40c > core/src/main/scala/kafka/producer/ConsoleProducer.scala > dd39ff22c918fe5b05f04582b748e32349b2055f > core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala > PRE-CREATION > core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala > c002f5ea38ece66ad559fadb18ffaf40ac2026aa > core/src/test/scala/integration/kafka/api/ProducerSendTest.scala > 66ea76b9b6c0f8839f715c845fb9b9671b8f35c1 > perf/src/main/scala/kafka/perf/ProducerPerformance.scala > 3df0d130308a35fca96184adc21eeee2eea1488f > > Diff: https://reviews.apache.org/r/18299/diff/ > > > Testing > ------- > > integration tests > > unit tests > > stress tests (1K message size, 1M messages in producer performance with ack = > 1, linger time = 0ms/500ms, random bit/all-ones messages) > > snappy dynamic load test > > > Thanks, > > Guozhang Wang > >