-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17263/#review33033
-----------------------------------------------------------


Overall, this looks great. Here are a few comments - I have some more
minor/stylistic comments (typos in comments and broken javadoc, code
conventions, minor javadoc edits and such) which I would rather defer for a
later more final patch.

I only did a cursory review of Protocol (which I on the surface looks very
promising) and did not review the unit tests and metrics package. WRT
metrics I'm a bit ambivalent - i.e., my preference would be to resolve that
first if you intend to switch to custom metrics or just stick to coda hale.
The main benefits I see are full control over how metrics are computed
(e.g., coda hale metrics at least in the version we are using makes it
difficult/impossible to configure reservoir samples sizes, decay constants,
reporting intervals, etc.) and predictable API.

A lot of logging and stats are missing (e.g., request rates, callback
execution time, ser-de time, etc.). I'm assuming these will be addressed in
your final patch.



clients/src/main/java/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/17263/#comment62223>

    This could add a couple minutes startup for producers that send to several
    topics (such as mirror-makers) since metadata lookup (at startup) will be
    issued serially for each topic.
    



clients/src/main/java/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/17263/#comment62222>

    IIRC our current producer allows re-partitioning to an available partition.
    We can probably do something similar on producer errors, but arguably
    that approach is an artifact of longer than ideal leader election latency.
    



clients/src/main/java/kafka/clients/producer/internals/BufferPool.java
<https://reviews.apache.org/r/17263/#comment62233>

    My reading of bufferPool - let me know if this is incorrect:
    - For most producers, most allocations will be of batch-size. (And most
      batches will not entirely fill the batch-size buffer).
    - Batch size needs to be properly tuned:
      - If too low, then you end up having (typically) one of the following:
        - Messages exceed batch size and thus poolable size and so you end up
          having to allocate on every record.
        - Messages fit but you have more sends than (probably) necessary given
          that you are sending smaller requests
      - If too high, then you could end up wasting a lot of memory especially
        for producers such as the mirror maker which produces to several
        partitions.
            
    It is an interesting approach, and I will think about it more. I would
    expect it to work very well for 90% of producers that send to a few topics,
    but it may be problematic for large producers such as mirror makers for
    which my intuition is that even very simple memory banks (i.e., a few levels
    of free lists) would address without fully reimplementing malloc. E.g., for
    each free list if we were to maintain counts on percentage of
    smaller-than-half allocations we could if necessary create a free list of
    smaller poolable sizes and keep a global cap on the number of free lists.
    



clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java
<https://reviews.apache.org/r/17263/#comment62219>

    Another simple approach is to just attach the last send time to each
    topic-partition and sort this in increasing order of last send time although
    a more ideal heuristic would be to also take into account the actual data
    available.
    



clients/src/main/java/kafka/clients/producer/internals/RecordBatch.java
<https://reviews.apache.org/r/17263/#comment62237>

    Would it be clearer to a user to have an "error callback" vs. a single
    callback? i.e., a user may want different handling for either scenario and
    has to "remember" to check hasError if there is only one callback. This
    probably folds into the API discussion which I have not yet looked over in
    great detail.
    



clients/src/main/java/kafka/clients/producer/internals/Sender.java
<https://reviews.apache.org/r/17263/#comment62226>

    On retryable errors we should probably force a metadata refresh. This is
    separate from retries (which I don't see implemented in this patch). Are
    retries going to be in-built or is the client expected to handle retries?
    
    



clients/src/main/java/kafka/clients/producer/internals/Sender.java
<https://reviews.apache.org/r/17263/#comment62236>

    Why do we need a deque for inflight requests? Given that there can be at
    most one outstanding request a reference should have been sufficient - no?
    



clients/src/main/java/kafka/common/Node.java
<https://reviews.apache.org/r/17263/#comment62194>

    Can cache the result (as you do for TopicPartition).
    



clients/src/main/java/kafka/common/StringSerialization.java
<https://reviews.apache.org/r/17263/#comment62192>

    What is the motivation for wrapping encoding exceptions with KafkaException?
    



clients/src/main/java/kafka/common/config/AbstractConfig.java
<https://reviews.apache.org/r/17263/#comment62197>

    ... is currently unused :) Assuming you will incorporate this in the final
    patch.
    



clients/src/main/java/kafka/common/config/ConfigException.java
<https://reviews.apache.org/r/17263/#comment62205>

    How will these UIDs be used?
    



clients/src/main/java/kafka/common/network/ByteBufferReceive.java
<https://reviews.apache.org/r/17263/#comment62208>

    Unused



clients/src/main/java/kafka/common/network/ByteBufferReceive.java
<https://reviews.apache.org/r/17263/#comment62209>

    remaining needs to be updated



clients/src/main/java/kafka/common/network/NetworkReceive.java
<https://reviews.apache.org/r/17263/#comment62212>

    Seems to be an unnecessary constructor.



clients/src/main/java/kafka/common/network/Selector.java
<https://reviews.apache.org/r/17263/#comment62213>

    Shouldn't we check earlier in this method if we have an established
    connection?
    
    Also, the containsKey check should take the id.
    



clients/src/main/java/kafka/common/network/Send.java
<https://reviews.apache.org/r/17263/#comment62210>

    Not sure what the use-case for this would be.



clients/src/main/java/kafka/common/utils/CopyOnWriteMap.java
<https://reviews.apache.org/r/17263/#comment62214>

    Unused class.


- Joel Koshy


On Jan. 23, 2014, 8:54 p.m., Jay Kreps wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/17263/
> -----------------------------------------------------------
> 
> (Updated Jan. 23, 2014, 8:54 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1227
>     https://issues.apache.org/jira/browse/KAFKA-1227
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1227 New producer!
> 
> 
> Diffs
> -----
> 
>   clients/build.sbt PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/BufferExhaustedException.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/Callback.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/DefaultPartitioner.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/KafkaProducer.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/MockProducer.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/Partitioner.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/Producer.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/ProducerConfig.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/ProducerRecord.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/RecordSend.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/BufferPool.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/Metadata.java 
> PRE-CREATION 
>   
> clients/src/main/java/kafka/clients/producer/internals/ProduceRequestResult.java
>  PRE-CREATION 
>   
> clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/RecordBatch.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/Sender.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/clients/tools/ProducerPerformance.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/ByteSerialization.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Cluster.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Configurable.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Deserializer.java PRE-CREATION 
>   clients/src/main/java/kafka/common/KafkaException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Metric.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Node.java PRE-CREATION 
>   clients/src/main/java/kafka/common/PartitionInfo.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Serializer.java PRE-CREATION 
>   clients/src/main/java/kafka/common/StringSerialization.java PRE-CREATION 
>   clients/src/main/java/kafka/common/TopicPartition.java PRE-CREATION 
>   clients/src/main/java/kafka/common/config/AbstractConfig.java PRE-CREATION 
>   clients/src/main/java/kafka/common/config/ConfigDef.java PRE-CREATION 
>   clients/src/main/java/kafka/common/config/ConfigException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/ApiException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/CorruptMessageException.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/LeaderNotAvailableException.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/MessageTooLargeException.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/NetworkException.java 
> PRE-CREATION 
>   
> clients/src/main/java/kafka/common/errors/NotLeaderForPartitionException.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/OffsetMetadataTooLarge.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/OffsetOutOfRangeException.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/RetryableException.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/TimeoutException.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/UnknownServerException.java 
> PRE-CREATION 
>   
> clients/src/main/java/kafka/common/errors/UnknownTopicOrPartitionException.java
>  PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/CompoundStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/JmxReporter.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/KafkaMetric.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Measurable.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/MeasurableStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/MetricConfig.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Metrics.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/MetricsReporter.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Quota.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/QuotaViolationException.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Sensor.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Stat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Avg.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Count.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Histogram.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Max.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Min.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Percentile.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Percentiles.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Rate.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/SampledStat.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Total.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/WindowedStat.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/network/ByteBufferReceive.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/network/ByteBufferSend.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/NetworkReceive.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/NetworkSend.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Receive.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Selectable.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Selector.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Send.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/ApiKeys.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/Errors.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/ProtoUtils.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/Protocol.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/ArrayOf.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Field.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Schema.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/SchemaException.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Struct.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Type.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/CompressionType.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/InvalidRecordException.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/record/LogEntry.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/MemoryRecords.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/Record.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/Records.java PRE-CREATION 
>   clients/src/main/java/kafka/common/requests/RequestHeader.java PRE-CREATION 
>   clients/src/main/java/kafka/common/requests/RequestSend.java PRE-CREATION 
>   clients/src/main/java/kafka/common/requests/ResponseHeader.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/AbstractIterator.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/CopyOnWriteMap.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/Crc32.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/KafkaThread.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/SystemTime.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/Time.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/Utils.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/common/network/SelectorTest.java 
> PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/BufferPoolTest.java 
> PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/MetadataTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/MockProducerTest.java 
> PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/RecordAccumulatorTest.java 
> PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/RecordSendTest.java 
> PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/SenderTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/config/ConfigDefTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/metrics/JmxReporterTest.java 
> PRE-CREATION 
>   clients/src/test/java/kafka/common/metrics/MetricsTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/metrics/stats/HistogramTest.java 
> PRE-CREATION 
>   
> clients/src/test/java/kafka/common/protocol/types/ProtocolSerializationTest.java
>  PRE-CREATION 
>   clients/src/test/java/kafka/common/record/MemoryRecordsTest.java 
> PRE-CREATION 
>   clients/src/test/java/kafka/common/record/RecordTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/utils/AbstractIteratorTest.java 
> PRE-CREATION 
>   clients/src/test/java/kafka/common/utils/MockTime.java PRE-CREATION 
>   clients/src/test/java/kafka/test/MetricsBench.java PRE-CREATION 
>   clients/src/test/java/kafka/test/Microbenchmarks.java PRE-CREATION 
>   clients/src/test/java/kafka/test/MockSelector.java PRE-CREATION 
>   clients/src/test/java/kafka/test/TestUtils.java PRE-CREATION 
>   project/Build.scala 098e874b8a14067739e6cd1f0a9ab9597ae84b6e 
> 
> Diff: https://reviews.apache.org/r/17263/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jay Kreps
> 
>

Reply via email to