> On March 10, 2014, 5:23 p.m., Jun Rao wrote: > > clients/src/main/java/org/apache/kafka/common/record/Compressor.java, lines > > 100-101 > > <https://reviews.apache.org/r/18299/diff/7/?file=514016#file514016line100> > > > > Would it be better to write the code as: > > > > catch (Exception e) { > > if (e instanceof IOException || e instanceof IOException) { > > // do sth > > } > > > > This avoids duplicating the code and we can just say I/O exception in > > the log.
I would personally prefer the current code layout since 1) IOException and BufferOverflowException are actually for different cases, the first one is not expected while the other is; we handle IOException here only to not expose them in MemoryRecords; 2) If we catch Exception, then for "else" we'd better re-throw them, which will make the function having to be declared as throws Exceptions. > On March 10, 2014, 5:23 p.m., Jun Rao wrote: > > clients/src/main/java/org/apache/kafka/common/record/Compressor.java, lines > > 111-112 > > <https://reviews.apache.org/r/18299/diff/7/?file=514016#file514016line111> > > > > Do we need the input to be final? Not necessarily, I put them here just to make sure they are not altered at any cases. Do you think there is a reason we should better remove them? > On March 10, 2014, 5:23 p.m., Jun Rao wrote: > > clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java, > > lines 50-53 > > <https://reviews.apache.org/r/18299/diff/7/?file=514017#file514017line50> > > > > If we have a separate init(), every caller has to remember calling this > > after the construction of MemoryRecord. Is this worth doing lazy init for > > compressor? If it's really worth doing, perhaps we can do the lazy init in > > each of the func (e.g. append) that needs the compressor. I think it is necessary to do lazy init, but just efficient. Since while doing the init for compression the header will be re-written, which we do not want while reading a memory records. I can move the init() to Compressor and check that every time though. > On March 10, 2014, 5:23 p.m., Jun Rao wrote: > > clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java, > > lines 178-179 > > <https://reviews.apache.org/r/18299/diff/7/?file=514017#file514017line178> > > > > Do we need to wrap it or just let it pass through? If this is the only place that could throw BufferUnderflowException in consumers/mirrormakers, then probably we do not need to wrap it; otherwise I would prefer to indicate where this exception is thrown. How about keeping it here for now and remove it later when we realize that this is really not necessary? > On March 10, 2014, 5:23 p.m., Jun Rao wrote: > > core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala, > > lines 145-146 > > <https://reviews.apache.org/r/18299/diff/7/?file=514024#file514024line145> > > > > Do we actually hit the Overflow exception with this partition size? I have confirmed this in my tests with trace logging. > On March 10, 2014, 5:23 p.m., Jun Rao wrote: > > core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala, > > lines 119-121 > > <https://reviews.apache.org/r/18299/diff/7/?file=514024#file514024line119> > > > > Could we also verify the content of the fetched messages? Ack. > On March 10, 2014, 5:23 p.m., Jun Rao wrote: > > core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala, > > lines 170-171 > > <https://reviews.apache.org/r/18299/diff/7/?file=514024#file514024line170> > > > > Could we also verify the content of the fetched messages? Ack. > On March 10, 2014, 5:23 p.m., Jun Rao wrote: > > clients/src/main/java/org/apache/kafka/common/record/Record.java, lines > > 91-94 > > <https://reviews.apache.org/r/18299/diff/7/?file=514018#file514018line91> > > > > It's still weird to always write the data as uncompressed. In > > RecordTest, the compressed type is not really being exercised since the > > record is created as uncompressed. Perhaps we should just change the unit > > test. If the compression type is not none, we will have to decompress the > > value to get the uncompressed content. Added the comments for clearer explanation. - Guozhang ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/18299/#review36658 ----------------------------------------------------------- On March 8, 2014, 12:36 a.m., Guozhang Wang wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/18299/ > ----------------------------------------------------------- > > (Updated March 8, 2014, 12:36 a.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1253 > https://issues.apache.org/jira/browse/KAFKA-1253 > > > Repository: kafka > > > Description > ------- > > Incorporate Jun's comments round two > > Refactored compression logic into Compressor > > GZIP/Snappy integration tests > > > Diffs > ----- > > build.gradle 84fa0d6b5f7405af755c5d7ff7bdd7592bb8668f > clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java > 1ac69436f117800815b8d50f042e9e2a29364b43 > clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java > 32e12ad149f6d70c96a498d0a390976f77bf9e2a > > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java > 616e1006f21c54af8260e84a96928cb8893ceb7c > > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java > 038a05a94b795ec0a95b2d40a89222394b5a74c4 > > clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/common/record/CompressionType.java > 906da02d02c03aadd8ab73ed2fc9a1898acb8d72 > clients/src/main/java/org/apache/kafka/common/record/Compressor.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java > 9d8935fa3beeb2a78b109a41ed76fd4374239560 > clients/src/main/java/org/apache/kafka/common/record/Record.java > f1dc9778502cbdfe982254fb6e25947842622239 > clients/src/main/java/org/apache/kafka/common/utils/Utils.java > 0c6b3656375721a718fb4de10118170aacce0ea9 > clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java > b0745b528cef929c4273f7e2ac4de1476cfc25ad > clients/src/test/java/org/apache/kafka/common/record/RecordTest.java > ae54d67da9907b0a043180c7395a1370b3d0528d > clients/src/test/java/org/apache/kafka/common/utils/CrcTest.java > PRE-CREATION > clients/src/test/java/org/apache/kafka/test/TestUtils.java > 36cfc0fda742eb501af2c2c0330e3f461cf1f40c > core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala > PRE-CREATION > core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala > c002f5ea38ece66ad559fadb18ffaf40ac2026aa > core/src/test/scala/integration/kafka/api/ProducerSendTest.scala > 66ea76b9b6c0f8839f715c845fb9b9671b8f35c1 > > Diff: https://reviews.apache.org/r/18299/diff/ > > > Testing > ------- > > integration tests > > snappy dynamic load test > > > Thanks, > > Guozhang Wang > >