> On March 16, 2014, 6:42 p.m., Jun Rao wrote: > > clients/src/main/java/org/apache/kafka/common/record/Compressor.java, lines > > 49-50 > > <https://reviews.apache.org/r/18299/diff/12/?file=520198#file520198line49> > > > > We probably can call Compressor.putRecord() directly since when key and > > value are null, Record.write and Compressor.putRecord are doing the same > > thing. Currently, it's a bit weird that Compressor calls Record.write, > > which calls putRecord back in Compressor.
That is great point. The problem here is that this in inside Compressor's constructor function now, and it should use a compressor with compression-type NONE. I can either copy the write logic of Record with NONE compression to Compressor, but which will make it in two places; so I would prefer to put it just as is, unless you have some better suggestions? > On March 16, 2014, 6:42 p.m., Jun Rao wrote: > > clients/src/main/java/org/apache/kafka/common/record/Compressor.java, lines > > 169-170 > > <https://reviews.apache.org/r/18299/diff/12/?file=520198#file520198line169> > > > > The latter estimate seems more accurate. Could you explain why we need > > the first estimate? I think the latter is not always more accurate since it assumes the whole block size (default to the same as the batch size, 16KB) is full, which is rarely the case. Anyways, I will do some perf tests on the effectiveness of these heuristics. > On March 16, 2014, 6:42 p.m., Jun Rao wrote: > > clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java, > > lines 42-43 > > <https://reviews.apache.org/r/18299/diff/12/?file=520199#file520199line42> > > > > It seems that in all callers, writable is always true. For now, yes. This is used for consumer, where writable will be false. I added this boolean also to avoid maybeInit for each append call. > On March 16, 2014, 6:42 p.m., Jun Rao wrote: > > clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java, > > lines 168-169 > > <https://reviews.apache.org/r/18299/diff/12/?file=520199#file520199line168> > > > > Not sure that I understand the comment on depending on remaining. Updated the comments. Does this sounds more understandable? > On March 16, 2014, 6:42 p.m., Jun Rao wrote: > > core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala, > > lines 124-125 > > <https://reviews.apache.org/r/18299/diff/12/?file=520207#file520207line124> > > > > Instead of duplicating the input messages, perhaps we could store them > > in an array initially? Unfortunately we cannot for now, since on the producer it is ProduceRecord while on the consumer it is Message, and it is hard to mutually translate them. And I think we do not really need to avoid duplication across test cases of a parameterized test? - Guozhang ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/18299/#review37311 ----------------------------------------------------------- On March 15, 2014, 12:40 a.m., Guozhang Wang wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/18299/ > ----------------------------------------------------------- > > (Updated March 15, 2014, 12:40 a.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1253 > https://issues.apache.org/jira/browse/KAFKA-1253 > > > Repository: kafka > > > Description > ------- > > In-place compression with > > 1) Dynamic reallocation in the underlying byte buffer > 2) Written bytes estimate to reduce reallocation probabilities > 3) Deallocation in buffer pool following the original capacity > > > Diffs > ----- > > build.gradle 84fa0d6b5f7405af755c5d7ff7bdd7592bb8668f > clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java > 1ac69436f117800815b8d50f042e9e2a29364b43 > clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java > 32e12ad149f6d70c96a498d0a390976f77bf9e2a > > clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java > 5bed60730ea108684bea2440af5a008feb0ada61 > > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java > 7a03f389cc6a4d56152b882555d7498af9a04d20 > > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java > 038a05a94b795ec0a95b2d40a89222394b5a74c4 > > clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java > 3ebbb804242be6a001b3bae6524afccc85a87602 > > clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/record/CompressionFactory.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/common/record/CompressionType.java > 906da02d02c03aadd8ab73ed2fc9a1898acb8d72 > clients/src/main/java/org/apache/kafka/common/record/Compressor.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java > 9d8935fa3beeb2a78b109a41ed76fd4374239560 > clients/src/main/java/org/apache/kafka/common/record/Record.java > f1dc9778502cbdfe982254fb6e25947842622239 > clients/src/main/java/org/apache/kafka/common/utils/Utils.java > 0c6b3656375721a718fb4de10118170aacce0ea9 > clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java > b0745b528cef929c4273f7e2ac4de1476cfc25ad > clients/src/test/java/org/apache/kafka/common/record/RecordTest.java > ae54d67da9907b0a043180c7395a1370b3d0528d > clients/src/test/java/org/apache/kafka/common/utils/CrcTest.java > PRE-CREATION > clients/src/test/java/org/apache/kafka/test/TestUtils.java > 36cfc0fda742eb501af2c2c0330e3f461cf1f40c > core/src/main/scala/kafka/producer/ConsoleProducer.scala > dd39ff22c918fe5b05f04582b748e32349b2055f > core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala > PRE-CREATION > core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala > c002f5ea38ece66ad559fadb18ffaf40ac2026aa > core/src/test/scala/integration/kafka/api/ProducerSendTest.scala > 66ea76b9b6c0f8839f715c845fb9b9671b8f35c1 > perf/src/main/scala/kafka/perf/ProducerPerformance.scala > f12a45becb11a8bed586024866235b268630fec6 > > Diff: https://reviews.apache.org/r/18299/diff/ > > > Testing > ------- > > integration tests > > unit tests > > stress tests (1K message size, 1M messages in producer performance with ack = > 1, linger time = 0ms/500ms, random bit/all-ones messages) > > snappy dynamic load test > > > Thanks, > > Guozhang Wang > >