> On March 19, 2014, 3:17 a.m., Jay Kreps wrote: > > clients/src/main/java/org/apache/kafka/common/record/Compressor.java, line > > 103 > > <https://reviews.apache.org/r/18299/diff/14/?file=526588#file526588line103> > > > > These should not be swallowed. > > Guozhang Wang wrote: > The only reason I swallow them here is that IOExceptions should > theoretically never be thrown, since the underlying stream used a byte > buffer, which does not do any IO and hence not throwing IOException. But the > signature of the function does declare throws IOException, that is why I > swallow them here. I could also wrap it with the runtime KafkaException. Let > me know which one do you prefer.
Yeah please do wrap it in KafkaException or another RuntimeException and rethrow. The problem is that if we are wrong and this exception is thrown for some reason then we will be silently swallowing it and it will be hard to figure out what is going on... > On March 19, 2014, 3:17 a.m., Jay Kreps wrote: > > clients/src/main/java/org/apache/kafka/common/record/CompressionFactory.java, > > line 46 > > <https://reviews.apache.org/r/18299/diff/14/?file=526586#file526586line46> > > > > I don't think this is needed. Inside an if statement you should be able > > to refer to a class. The class only loads if that branch is taken. > > Guozhang Wang wrote: > Just tested and verified that if the library is imported in the class > file, then whenever that class is ever executed the library will be loaded. > So we have t use the class loader. Here was the test I did last night to verify prior to my comment: jkreps-mn:tmp jkreps$ cat Test.java class Test { public static void main(String[] args) throws Exception { if(args.length == 0) return; else new org.xerial.snappy.SnappyOutputStream(System.out); } }jkreps-mn:tmp jkreps$ javac -cp /Users/jkreps/.m2/repository/org/xerial/snappy/snappy-java/1.0.3.2/snappy-java-1.0.3.2.jar Test.java jkreps-mn:tmp jkreps$ java Test jkreps-mn:tmp jkreps$ java Test 0 Exception in thread "main" java.lang.NoClassDefFoundError: org/xerial/snappy/SnappyOutputStream at Test.main(Test.java:8) Caused by: java.lang.ClassNotFoundException: org.xerial.snappy.SnappyOutputStream at java.net.URLClassLoader$1.run(URLClassLoader.java:202) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:190) at java.lang.ClassLoader.loadClass(ClassLoader.java:306) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301) at java.lang.ClassLoader.loadClass(ClassLoader.java:247) ... 1 more > On March 19, 2014, 3:17 a.m., Jay Kreps wrote: > > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java, > > line 58 > > <https://reviews.apache.org/r/18299/diff/14/?file=526582#file526582line58> > > > > What is the reason for this check? > > Guozhang Wang wrote: > There is a corner case that the heuristics used to estimate the written > bytes would return a value larger than the capacity already if we use the > compressor's buffer size. For now since I removed this heuristics this may > not hit, but I think it is better leaving it here in case we change to some > other heuristics in the future. If it isn't being used let's remove it. If you want to keep it it needs to be intelligible. I.e. a person reading the code needs to know why that check needs to be there. Even with your explanation I don't really get it. Maybe a comment or something? - Jay ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/18299/#review37679 ----------------------------------------------------------- On March 19, 2014, 11:32 p.m., Guozhang Wang wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/18299/ > ----------------------------------------------------------- > > (Updated March 19, 2014, 11:32 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1253 > https://issues.apache.org/jira/browse/KAFKA-1253 > > > Repository: kafka > > > Description > ------- > > Incorporated Jay's comments > > In-place compression with > > 1) Dynamic reallocation in the underlying byte buffer > 2) Written bytes estimate to reduce reallocation probabilities > 3) Deallocation in buffer pool following the original capacity > > > Diffs > ----- > > build.gradle 84fa0d6b5f7405af755c5d7ff7bdd7592bb8668f > clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java > 1ac69436f117800815b8d50f042e9e2a29364b43 > clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java > 32e12ad149f6d70c96a498d0a390976f77bf9e2a > > clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java > b69866a9fb9a8b4e1e78d304a20eda3cbf178c6f > > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java > 673b2962771c28ceb3c7a6c0fd6f69521bd7ed16 > > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java > 038a05a94b795ec0a95b2d40a89222394b5a74c4 > > clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java > 3ebbb804242be6a001b3bae6524afccc85a87602 > > clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/common/record/CompressionType.java > 906da02d02c03aadd8ab73ed2fc9a1898acb8d72 > clients/src/main/java/org/apache/kafka/common/record/Compressor.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java > 9d8935fa3beeb2a78b109a41ed76fd4374239560 > clients/src/main/java/org/apache/kafka/common/record/Record.java > f1dc9778502cbdfe982254fb6e25947842622239 > clients/src/main/java/org/apache/kafka/common/utils/Crc32.java > 153c5a6d345293aa0ba2cf513373323a6e9f2467 > clients/src/main/java/org/apache/kafka/common/utils/Utils.java > 0c6b3656375721a718fb4de10118170aacce0ea9 > clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java > b0745b528cef929c4273f7e2ac4de1476cfc25ad > clients/src/test/java/org/apache/kafka/common/record/RecordTest.java > ae54d67da9907b0a043180c7395a1370b3d0528d > clients/src/test/java/org/apache/kafka/common/utils/CrcTest.java > PRE-CREATION > clients/src/test/java/org/apache/kafka/test/TestUtils.java > 36cfc0fda742eb501af2c2c0330e3f461cf1f40c > core/src/main/scala/kafka/producer/ConsoleProducer.scala > dd39ff22c918fe5b05f04582b748e32349b2055f > core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala > PRE-CREATION > core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala > c002f5ea38ece66ad559fadb18ffaf40ac2026aa > core/src/test/scala/integration/kafka/api/ProducerSendTest.scala > 66ea76b9b6c0f8839f715c845fb9b9671b8f35c1 > perf/src/main/scala/kafka/perf/ProducerPerformance.scala > 3df0d130308a35fca96184adc21eeee2eea1488f > > Diff: https://reviews.apache.org/r/18299/diff/ > > > Testing > ------- > > integration tests > > unit tests > > stress tests (1K message size, 1M messages in producer performance with ack = > 1, linger time = 0ms/500ms, random bit/all-ones messages) > > snappy dynamic load test > > > Thanks, > > Guozhang Wang > >