----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/17263/#review32800 -----------------------------------------------------------
clients/src/main/java/kafka/clients/producer/Callback.java <https://reviews.apache.org/r/17263/#comment61778> If the caller implements onCompletion to block for a while, the io thread would be blocked. Should this be mentioned somewhere in the interface? clients/src/main/java/kafka/clients/producer/KafkaProducer.java <https://reviews.apache.org/r/17263/#comment61779> "you can easily achieve" clients/src/main/java/kafka/clients/producer/KafkaProducer.java <https://reviews.apache.org/r/17263/#comment61780> I think what you meant here when you said "the callback will execute in the I/O thread of the producer and so should be reasonably fast." is that the callback should be implemented such that it returns immediately. This is not very obvious from the current description. clients/src/main/java/kafka/clients/producer/KafkaProducer.java <https://reviews.apache.org/r/17263/#comment61781> I am not a big fan of this api. I started on these lines for another project and it soon made me reimplement most of codahale features which really is not worth it. I understand the reasoning behind this (not have dependency on external libraries and have any reporters written) but it becomes a pain to maintain, reimplement and stabilize. Also, clients would now have to understand how to use this data to implement metrics. Codahale already supports quite a few reporters. An alternative to the current approach is to take a list of reporters from the config and instantiate them in code using codahale reporters. The project is very active and have done quite a few optimizations recently which are useful. We could end up being handicapped because we may not have the right metrics and only way to get it is to implement something on our own. clients/src/main/java/kafka/clients/producer/Partitioner.java <https://reviews.apache.org/r/17263/#comment61782> make use of the key clients/src/main/java/kafka/clients/producer/RecordSend.java <https://reviews.apache.org/r/17263/#comment61783> This seems a bit confusing. It would be useful to keep the usage of RecordSend in the callback and sync producer to be the same. Is there any reason that await throws on error when you have apis to check error? clients/src/main/java/kafka/clients/producer/internals/BufferPool.java <https://reviews.apache.org/r/17263/#comment61786> A little more information on how availableMemory and free list are used would be useful to comment about. clients/src/main/java/kafka/clients/producer/internals/BufferPool.java <https://reviews.apache.org/r/17263/#comment61784> Is this exception safe? clients/src/main/java/kafka/clients/producer/internals/BufferPool.java <https://reviews.apache.org/r/17263/#comment61785> What happens if freeUp actually got memory from the free list? We still seem to create a new buffer with the given size. Why are we not returning from the pool? It looks like availableMemory can go negative here. clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java <https://reviews.apache.org/r/17263/#comment61787> We can allocate a batch larger than the batchSize? clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java <https://reviews.apache.org/r/17263/#comment61788> ready to be sent clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java <https://reviews.apache.org/r/17263/#comment61789> I am assuming this method is called only by a single thread and hence there is no requirement to synchronize here clients/src/main/java/kafka/clients/producer/internals/RecordBatch.java <https://reviews.apache.org/r/17263/#comment61790> magic number - Sriram Subramanian On Jan. 23, 2014, 8:54 p.m., Jay Kreps wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/17263/ > ----------------------------------------------------------- > > (Updated Jan. 23, 2014, 8:54 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1227 > https://issues.apache.org/jira/browse/KAFKA-1227 > > > Repository: kafka > > > Description > ------- > > KAFKA-1227 New producer! > > > Diffs > ----- > > clients/build.sbt PRE-CREATION > clients/src/main/java/kafka/clients/producer/BufferExhaustedException.java > PRE-CREATION > clients/src/main/java/kafka/clients/producer/Callback.java PRE-CREATION > clients/src/main/java/kafka/clients/producer/DefaultPartitioner.java > PRE-CREATION > clients/src/main/java/kafka/clients/producer/KafkaProducer.java > PRE-CREATION > clients/src/main/java/kafka/clients/producer/MockProducer.java PRE-CREATION > clients/src/main/java/kafka/clients/producer/Partitioner.java PRE-CREATION > clients/src/main/java/kafka/clients/producer/Producer.java PRE-CREATION > clients/src/main/java/kafka/clients/producer/ProducerConfig.java > PRE-CREATION > clients/src/main/java/kafka/clients/producer/ProducerRecord.java > PRE-CREATION > clients/src/main/java/kafka/clients/producer/RecordSend.java PRE-CREATION > clients/src/main/java/kafka/clients/producer/internals/BufferPool.java > PRE-CREATION > clients/src/main/java/kafka/clients/producer/internals/Metadata.java > PRE-CREATION > > clients/src/main/java/kafka/clients/producer/internals/ProduceRequestResult.java > PRE-CREATION > > clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java > PRE-CREATION > clients/src/main/java/kafka/clients/producer/internals/RecordBatch.java > PRE-CREATION > clients/src/main/java/kafka/clients/producer/internals/Sender.java > PRE-CREATION > clients/src/main/java/kafka/clients/tools/ProducerPerformance.java > PRE-CREATION > clients/src/main/java/kafka/common/ByteSerialization.java PRE-CREATION > clients/src/main/java/kafka/common/Cluster.java PRE-CREATION > clients/src/main/java/kafka/common/Configurable.java PRE-CREATION > clients/src/main/java/kafka/common/Deserializer.java PRE-CREATION > clients/src/main/java/kafka/common/KafkaException.java PRE-CREATION > clients/src/main/java/kafka/common/Metric.java PRE-CREATION > clients/src/main/java/kafka/common/Node.java PRE-CREATION > clients/src/main/java/kafka/common/PartitionInfo.java PRE-CREATION > clients/src/main/java/kafka/common/Serializer.java PRE-CREATION > clients/src/main/java/kafka/common/StringSerialization.java PRE-CREATION > clients/src/main/java/kafka/common/TopicPartition.java PRE-CREATION > clients/src/main/java/kafka/common/config/AbstractConfig.java PRE-CREATION > clients/src/main/java/kafka/common/config/ConfigDef.java PRE-CREATION > clients/src/main/java/kafka/common/config/ConfigException.java PRE-CREATION > clients/src/main/java/kafka/common/errors/ApiException.java PRE-CREATION > clients/src/main/java/kafka/common/errors/CorruptMessageException.java > PRE-CREATION > clients/src/main/java/kafka/common/errors/LeaderNotAvailableException.java > PRE-CREATION > clients/src/main/java/kafka/common/errors/MessageTooLargeException.java > PRE-CREATION > clients/src/main/java/kafka/common/errors/NetworkException.java > PRE-CREATION > > clients/src/main/java/kafka/common/errors/NotLeaderForPartitionException.java > PRE-CREATION > clients/src/main/java/kafka/common/errors/OffsetMetadataTooLarge.java > PRE-CREATION > clients/src/main/java/kafka/common/errors/OffsetOutOfRangeException.java > PRE-CREATION > clients/src/main/java/kafka/common/errors/RetryableException.java > PRE-CREATION > clients/src/main/java/kafka/common/errors/TimeoutException.java > PRE-CREATION > clients/src/main/java/kafka/common/errors/UnknownServerException.java > PRE-CREATION > > clients/src/main/java/kafka/common/errors/UnknownTopicOrPartitionException.java > PRE-CREATION > clients/src/main/java/kafka/common/metrics/CompoundStat.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/JmxReporter.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/KafkaMetric.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/Measurable.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/MeasurableStat.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/MetricConfig.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/Metrics.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/MetricsReporter.java > PRE-CREATION > clients/src/main/java/kafka/common/metrics/Quota.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/QuotaViolationException.java > PRE-CREATION > clients/src/main/java/kafka/common/metrics/Sensor.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/Stat.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/Avg.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/Count.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/Histogram.java > PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/Max.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/Min.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/Percentile.java > PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/Percentiles.java > PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/Rate.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/SampledStat.java > PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/Total.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/WindowedStat.java > PRE-CREATION > clients/src/main/java/kafka/common/network/ByteBufferReceive.java > PRE-CREATION > clients/src/main/java/kafka/common/network/ByteBufferSend.java PRE-CREATION > clients/src/main/java/kafka/common/network/NetworkReceive.java PRE-CREATION > clients/src/main/java/kafka/common/network/NetworkSend.java PRE-CREATION > clients/src/main/java/kafka/common/network/Receive.java PRE-CREATION > clients/src/main/java/kafka/common/network/Selectable.java PRE-CREATION > clients/src/main/java/kafka/common/network/Selector.java PRE-CREATION > clients/src/main/java/kafka/common/network/Send.java PRE-CREATION > clients/src/main/java/kafka/common/protocol/ApiKeys.java PRE-CREATION > clients/src/main/java/kafka/common/protocol/Errors.java PRE-CREATION > clients/src/main/java/kafka/common/protocol/ProtoUtils.java PRE-CREATION > clients/src/main/java/kafka/common/protocol/Protocol.java PRE-CREATION > clients/src/main/java/kafka/common/protocol/types/ArrayOf.java PRE-CREATION > clients/src/main/java/kafka/common/protocol/types/Field.java PRE-CREATION > clients/src/main/java/kafka/common/protocol/types/Schema.java PRE-CREATION > clients/src/main/java/kafka/common/protocol/types/SchemaException.java > PRE-CREATION > clients/src/main/java/kafka/common/protocol/types/Struct.java PRE-CREATION > clients/src/main/java/kafka/common/protocol/types/Type.java PRE-CREATION > clients/src/main/java/kafka/common/record/CompressionType.java PRE-CREATION > clients/src/main/java/kafka/common/record/InvalidRecordException.java > PRE-CREATION > clients/src/main/java/kafka/common/record/LogEntry.java PRE-CREATION > clients/src/main/java/kafka/common/record/MemoryRecords.java PRE-CREATION > clients/src/main/java/kafka/common/record/Record.java PRE-CREATION > clients/src/main/java/kafka/common/record/Records.java PRE-CREATION > clients/src/main/java/kafka/common/requests/RequestHeader.java PRE-CREATION > clients/src/main/java/kafka/common/requests/RequestSend.java PRE-CREATION > clients/src/main/java/kafka/common/requests/ResponseHeader.java > PRE-CREATION > clients/src/main/java/kafka/common/utils/AbstractIterator.java PRE-CREATION > clients/src/main/java/kafka/common/utils/CopyOnWriteMap.java PRE-CREATION > clients/src/main/java/kafka/common/utils/Crc32.java PRE-CREATION > clients/src/main/java/kafka/common/utils/KafkaThread.java PRE-CREATION > clients/src/main/java/kafka/common/utils/SystemTime.java PRE-CREATION > clients/src/main/java/kafka/common/utils/Time.java PRE-CREATION > clients/src/main/java/kafka/common/utils/Utils.java PRE-CREATION > clients/src/test/java/kafka/clients/common/network/SelectorTest.java > PRE-CREATION > clients/src/test/java/kafka/clients/producer/BufferPoolTest.java > PRE-CREATION > clients/src/test/java/kafka/clients/producer/MetadataTest.java PRE-CREATION > clients/src/test/java/kafka/clients/producer/MockProducerTest.java > PRE-CREATION > clients/src/test/java/kafka/clients/producer/RecordAccumulatorTest.java > PRE-CREATION > clients/src/test/java/kafka/clients/producer/RecordSendTest.java > PRE-CREATION > clients/src/test/java/kafka/clients/producer/SenderTest.java PRE-CREATION > clients/src/test/java/kafka/common/config/ConfigDefTest.java PRE-CREATION > clients/src/test/java/kafka/common/metrics/JmxReporterTest.java > PRE-CREATION > clients/src/test/java/kafka/common/metrics/MetricsTest.java PRE-CREATION > clients/src/test/java/kafka/common/metrics/stats/HistogramTest.java > PRE-CREATION > > clients/src/test/java/kafka/common/protocol/types/ProtocolSerializationTest.java > PRE-CREATION > clients/src/test/java/kafka/common/record/MemoryRecordsTest.java > PRE-CREATION > clients/src/test/java/kafka/common/record/RecordTest.java PRE-CREATION > clients/src/test/java/kafka/common/utils/AbstractIteratorTest.java > PRE-CREATION > clients/src/test/java/kafka/common/utils/MockTime.java PRE-CREATION > clients/src/test/java/kafka/test/MetricsBench.java PRE-CREATION > clients/src/test/java/kafka/test/Microbenchmarks.java PRE-CREATION > clients/src/test/java/kafka/test/MockSelector.java PRE-CREATION > clients/src/test/java/kafka/test/TestUtils.java PRE-CREATION > project/Build.scala 098e874b8a14067739e6cd1f0a9ab9597ae84b6e > > Diff: https://reviews.apache.org/r/17263/diff/ > > > Testing > ------- > > > Thanks, > > Jay Kreps > >