----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/17263/#review33033 -----------------------------------------------------------
Overall, this looks great. Here are a few comments - I have some more minor/stylistic comments (typos in comments and broken javadoc, code conventions, minor javadoc edits and such) which I would rather defer for a later more final patch. I only did a cursory review of Protocol (which I on the surface looks very promising) and did not review the unit tests and metrics package. WRT metrics I'm a bit ambivalent - i.e., my preference would be to resolve that first if you intend to switch to custom metrics or just stick to coda hale. The main benefits I see are full control over how metrics are computed (e.g., coda hale metrics at least in the version we are using makes it difficult/impossible to configure reservoir samples sizes, decay constants, reporting intervals, etc.) and predictable API. A lot of logging and stats are missing (e.g., request rates, callback execution time, ser-de time, etc.). I'm assuming these will be addressed in your final patch. clients/src/main/java/kafka/clients/producer/KafkaProducer.java <https://reviews.apache.org/r/17263/#comment62223> This could add a couple minutes startup for producers that send to several topics (such as mirror-makers) since metadata lookup (at startup) will be issued serially for each topic. clients/src/main/java/kafka/clients/producer/KafkaProducer.java <https://reviews.apache.org/r/17263/#comment62222> IIRC our current producer allows re-partitioning to an available partition. We can probably do something similar on producer errors, but arguably that approach is an artifact of longer than ideal leader election latency. clients/src/main/java/kafka/clients/producer/internals/BufferPool.java <https://reviews.apache.org/r/17263/#comment62233> My reading of bufferPool - let me know if this is incorrect: - For most producers, most allocations will be of batch-size. (And most batches will not entirely fill the batch-size buffer). - Batch size needs to be properly tuned: - If too low, then you end up having (typically) one of the following: - Messages exceed batch size and thus poolable size and so you end up having to allocate on every record. - Messages fit but you have more sends than (probably) necessary given that you are sending smaller requests - If too high, then you could end up wasting a lot of memory especially for producers such as the mirror maker which produces to several partitions. It is an interesting approach, and I will think about it more. I would expect it to work very well for 90% of producers that send to a few topics, but it may be problematic for large producers such as mirror makers for which my intuition is that even very simple memory banks (i.e., a few levels of free lists) would address without fully reimplementing malloc. E.g., for each free list if we were to maintain counts on percentage of smaller-than-half allocations we could if necessary create a free list of smaller poolable sizes and keep a global cap on the number of free lists. clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java <https://reviews.apache.org/r/17263/#comment62219> Another simple approach is to just attach the last send time to each topic-partition and sort this in increasing order of last send time although a more ideal heuristic would be to also take into account the actual data available. clients/src/main/java/kafka/clients/producer/internals/RecordBatch.java <https://reviews.apache.org/r/17263/#comment62237> Would it be clearer to a user to have an "error callback" vs. a single callback? i.e., a user may want different handling for either scenario and has to "remember" to check hasError if there is only one callback. This probably folds into the API discussion which I have not yet looked over in great detail. clients/src/main/java/kafka/clients/producer/internals/Sender.java <https://reviews.apache.org/r/17263/#comment62226> On retryable errors we should probably force a metadata refresh. This is separate from retries (which I don't see implemented in this patch). Are retries going to be in-built or is the client expected to handle retries? clients/src/main/java/kafka/clients/producer/internals/Sender.java <https://reviews.apache.org/r/17263/#comment62236> Why do we need a deque for inflight requests? Given that there can be at most one outstanding request a reference should have been sufficient - no? clients/src/main/java/kafka/common/Node.java <https://reviews.apache.org/r/17263/#comment62194> Can cache the result (as you do for TopicPartition). clients/src/main/java/kafka/common/StringSerialization.java <https://reviews.apache.org/r/17263/#comment62192> What is the motivation for wrapping encoding exceptions with KafkaException? clients/src/main/java/kafka/common/config/AbstractConfig.java <https://reviews.apache.org/r/17263/#comment62197> ... is currently unused :) Assuming you will incorporate this in the final patch. clients/src/main/java/kafka/common/config/ConfigException.java <https://reviews.apache.org/r/17263/#comment62205> How will these UIDs be used? clients/src/main/java/kafka/common/network/ByteBufferReceive.java <https://reviews.apache.org/r/17263/#comment62208> Unused clients/src/main/java/kafka/common/network/ByteBufferReceive.java <https://reviews.apache.org/r/17263/#comment62209> remaining needs to be updated clients/src/main/java/kafka/common/network/NetworkReceive.java <https://reviews.apache.org/r/17263/#comment62212> Seems to be an unnecessary constructor. clients/src/main/java/kafka/common/network/Selector.java <https://reviews.apache.org/r/17263/#comment62213> Shouldn't we check earlier in this method if we have an established connection? Also, the containsKey check should take the id. clients/src/main/java/kafka/common/network/Send.java <https://reviews.apache.org/r/17263/#comment62210> Not sure what the use-case for this would be. clients/src/main/java/kafka/common/utils/CopyOnWriteMap.java <https://reviews.apache.org/r/17263/#comment62214> Unused class. - Joel Koshy On Jan. 23, 2014, 8:54 p.m., Jay Kreps wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/17263/ > ----------------------------------------------------------- > > (Updated Jan. 23, 2014, 8:54 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1227 > https://issues.apache.org/jira/browse/KAFKA-1227 > > > Repository: kafka > > > Description > ------- > > KAFKA-1227 New producer! > > > Diffs > ----- > > clients/build.sbt PRE-CREATION > clients/src/main/java/kafka/clients/producer/BufferExhaustedException.java > PRE-CREATION > clients/src/main/java/kafka/clients/producer/Callback.java PRE-CREATION > clients/src/main/java/kafka/clients/producer/DefaultPartitioner.java > PRE-CREATION > clients/src/main/java/kafka/clients/producer/KafkaProducer.java > PRE-CREATION > clients/src/main/java/kafka/clients/producer/MockProducer.java PRE-CREATION > clients/src/main/java/kafka/clients/producer/Partitioner.java PRE-CREATION > clients/src/main/java/kafka/clients/producer/Producer.java PRE-CREATION > clients/src/main/java/kafka/clients/producer/ProducerConfig.java > PRE-CREATION > clients/src/main/java/kafka/clients/producer/ProducerRecord.java > PRE-CREATION > clients/src/main/java/kafka/clients/producer/RecordSend.java PRE-CREATION > clients/src/main/java/kafka/clients/producer/internals/BufferPool.java > PRE-CREATION > clients/src/main/java/kafka/clients/producer/internals/Metadata.java > PRE-CREATION > > clients/src/main/java/kafka/clients/producer/internals/ProduceRequestResult.java > PRE-CREATION > > clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java > PRE-CREATION > clients/src/main/java/kafka/clients/producer/internals/RecordBatch.java > PRE-CREATION > clients/src/main/java/kafka/clients/producer/internals/Sender.java > PRE-CREATION > clients/src/main/java/kafka/clients/tools/ProducerPerformance.java > PRE-CREATION > clients/src/main/java/kafka/common/ByteSerialization.java PRE-CREATION > clients/src/main/java/kafka/common/Cluster.java PRE-CREATION > clients/src/main/java/kafka/common/Configurable.java PRE-CREATION > clients/src/main/java/kafka/common/Deserializer.java PRE-CREATION > clients/src/main/java/kafka/common/KafkaException.java PRE-CREATION > clients/src/main/java/kafka/common/Metric.java PRE-CREATION > clients/src/main/java/kafka/common/Node.java PRE-CREATION > clients/src/main/java/kafka/common/PartitionInfo.java PRE-CREATION > clients/src/main/java/kafka/common/Serializer.java PRE-CREATION > clients/src/main/java/kafka/common/StringSerialization.java PRE-CREATION > clients/src/main/java/kafka/common/TopicPartition.java PRE-CREATION > clients/src/main/java/kafka/common/config/AbstractConfig.java PRE-CREATION > clients/src/main/java/kafka/common/config/ConfigDef.java PRE-CREATION > clients/src/main/java/kafka/common/config/ConfigException.java PRE-CREATION > clients/src/main/java/kafka/common/errors/ApiException.java PRE-CREATION > clients/src/main/java/kafka/common/errors/CorruptMessageException.java > PRE-CREATION > clients/src/main/java/kafka/common/errors/LeaderNotAvailableException.java > PRE-CREATION > clients/src/main/java/kafka/common/errors/MessageTooLargeException.java > PRE-CREATION > clients/src/main/java/kafka/common/errors/NetworkException.java > PRE-CREATION > > clients/src/main/java/kafka/common/errors/NotLeaderForPartitionException.java > PRE-CREATION > clients/src/main/java/kafka/common/errors/OffsetMetadataTooLarge.java > PRE-CREATION > clients/src/main/java/kafka/common/errors/OffsetOutOfRangeException.java > PRE-CREATION > clients/src/main/java/kafka/common/errors/RetryableException.java > PRE-CREATION > clients/src/main/java/kafka/common/errors/TimeoutException.java > PRE-CREATION > clients/src/main/java/kafka/common/errors/UnknownServerException.java > PRE-CREATION > > clients/src/main/java/kafka/common/errors/UnknownTopicOrPartitionException.java > PRE-CREATION > clients/src/main/java/kafka/common/metrics/CompoundStat.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/JmxReporter.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/KafkaMetric.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/Measurable.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/MeasurableStat.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/MetricConfig.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/Metrics.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/MetricsReporter.java > PRE-CREATION > clients/src/main/java/kafka/common/metrics/Quota.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/QuotaViolationException.java > PRE-CREATION > clients/src/main/java/kafka/common/metrics/Sensor.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/Stat.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/Avg.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/Count.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/Histogram.java > PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/Max.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/Min.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/Percentile.java > PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/Percentiles.java > PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/Rate.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/SampledStat.java > PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/Total.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/WindowedStat.java > PRE-CREATION > clients/src/main/java/kafka/common/network/ByteBufferReceive.java > PRE-CREATION > clients/src/main/java/kafka/common/network/ByteBufferSend.java PRE-CREATION > clients/src/main/java/kafka/common/network/NetworkReceive.java PRE-CREATION > clients/src/main/java/kafka/common/network/NetworkSend.java PRE-CREATION > clients/src/main/java/kafka/common/network/Receive.java PRE-CREATION > clients/src/main/java/kafka/common/network/Selectable.java PRE-CREATION > clients/src/main/java/kafka/common/network/Selector.java PRE-CREATION > clients/src/main/java/kafka/common/network/Send.java PRE-CREATION > clients/src/main/java/kafka/common/protocol/ApiKeys.java PRE-CREATION > clients/src/main/java/kafka/common/protocol/Errors.java PRE-CREATION > clients/src/main/java/kafka/common/protocol/ProtoUtils.java PRE-CREATION > clients/src/main/java/kafka/common/protocol/Protocol.java PRE-CREATION > clients/src/main/java/kafka/common/protocol/types/ArrayOf.java PRE-CREATION > clients/src/main/java/kafka/common/protocol/types/Field.java PRE-CREATION > clients/src/main/java/kafka/common/protocol/types/Schema.java PRE-CREATION > clients/src/main/java/kafka/common/protocol/types/SchemaException.java > PRE-CREATION > clients/src/main/java/kafka/common/protocol/types/Struct.java PRE-CREATION > clients/src/main/java/kafka/common/protocol/types/Type.java PRE-CREATION > clients/src/main/java/kafka/common/record/CompressionType.java PRE-CREATION > clients/src/main/java/kafka/common/record/InvalidRecordException.java > PRE-CREATION > clients/src/main/java/kafka/common/record/LogEntry.java PRE-CREATION > clients/src/main/java/kafka/common/record/MemoryRecords.java PRE-CREATION > clients/src/main/java/kafka/common/record/Record.java PRE-CREATION > clients/src/main/java/kafka/common/record/Records.java PRE-CREATION > clients/src/main/java/kafka/common/requests/RequestHeader.java PRE-CREATION > clients/src/main/java/kafka/common/requests/RequestSend.java PRE-CREATION > clients/src/main/java/kafka/common/requests/ResponseHeader.java > PRE-CREATION > clients/src/main/java/kafka/common/utils/AbstractIterator.java PRE-CREATION > clients/src/main/java/kafka/common/utils/CopyOnWriteMap.java PRE-CREATION > clients/src/main/java/kafka/common/utils/Crc32.java PRE-CREATION > clients/src/main/java/kafka/common/utils/KafkaThread.java PRE-CREATION > clients/src/main/java/kafka/common/utils/SystemTime.java PRE-CREATION > clients/src/main/java/kafka/common/utils/Time.java PRE-CREATION > clients/src/main/java/kafka/common/utils/Utils.java PRE-CREATION > clients/src/test/java/kafka/clients/common/network/SelectorTest.java > PRE-CREATION > clients/src/test/java/kafka/clients/producer/BufferPoolTest.java > PRE-CREATION > clients/src/test/java/kafka/clients/producer/MetadataTest.java PRE-CREATION > clients/src/test/java/kafka/clients/producer/MockProducerTest.java > PRE-CREATION > clients/src/test/java/kafka/clients/producer/RecordAccumulatorTest.java > PRE-CREATION > clients/src/test/java/kafka/clients/producer/RecordSendTest.java > PRE-CREATION > clients/src/test/java/kafka/clients/producer/SenderTest.java PRE-CREATION > clients/src/test/java/kafka/common/config/ConfigDefTest.java PRE-CREATION > clients/src/test/java/kafka/common/metrics/JmxReporterTest.java > PRE-CREATION > clients/src/test/java/kafka/common/metrics/MetricsTest.java PRE-CREATION > clients/src/test/java/kafka/common/metrics/stats/HistogramTest.java > PRE-CREATION > > clients/src/test/java/kafka/common/protocol/types/ProtocolSerializationTest.java > PRE-CREATION > clients/src/test/java/kafka/common/record/MemoryRecordsTest.java > PRE-CREATION > clients/src/test/java/kafka/common/record/RecordTest.java PRE-CREATION > clients/src/test/java/kafka/common/utils/AbstractIteratorTest.java > PRE-CREATION > clients/src/test/java/kafka/common/utils/MockTime.java PRE-CREATION > clients/src/test/java/kafka/test/MetricsBench.java PRE-CREATION > clients/src/test/java/kafka/test/Microbenchmarks.java PRE-CREATION > clients/src/test/java/kafka/test/MockSelector.java PRE-CREATION > clients/src/test/java/kafka/test/TestUtils.java PRE-CREATION > project/Build.scala 098e874b8a14067739e6cd1f0a9ab9597ae84b6e > > Diff: https://reviews.apache.org/r/17263/diff/ > > > Testing > ------- > > > Thanks, > > Jay Kreps > >