> On Jan. 28, 2014, 8:48 p.m., Joel Koshy wrote:
> > Overall, this looks great. Here are a few comments - I have some more
> > minor/stylistic comments (typos in comments and broken javadoc, code
> > conventions, minor javadoc edits and such) which I would rather defer for a
> > later more final patch.
> > 
> > I only did a cursory review of Protocol (which I on the surface looks very
> > promising) and did not review the unit tests and metrics package. WRT
> > metrics I'm a bit ambivalent - i.e., my preference would be to resolve that
> > first if you intend to switch to custom metrics or just stick to coda hale.
> > The main benefits I see are full control over how metrics are computed
> > (e.g., coda hale metrics at least in the version we are using makes it
> > difficult/impossible to configure reservoir samples sizes, decay constants,
> > reporting intervals, etc.) and predictable API.
> > 
> > A lot of logging and stats are missing (e.g., request rates, callback
> > execution time, ser-de time, etc.). I'm assuming these will be addressed in
> > your final patch.
> >

Yes, I held off on adding metrics and stats until we agree on how to do that.


> On Jan. 28, 2014, 8:48 p.m., Joel Koshy wrote:
> > clients/src/main/java/kafka/clients/producer/KafkaProducer.java, line 182
> > <https://reviews.apache.org/r/17263/diff/1/?file=436445#file436445line182>
> >
> >     This could add a couple minutes startup for producers that send to 
> > several
> >     topics (such as mirror-makers) since metadata lookup (at startup) will 
> > be
> >     issued serially for each topic.
> >

Wait, why? Shouldn't metadata fetch be very fast?


> On Jan. 28, 2014, 8:48 p.m., Joel Koshy wrote:
> > clients/src/main/java/kafka/clients/producer/KafkaProducer.java, line 186
> > <https://reviews.apache.org/r/17263/diff/1/?file=436445#file436445line186>
> >
> >     IIRC our current producer allows re-partitioning to an available 
> > partition.
> >     We can probably do something similar on producer errors, but arguably
> >     that approach is an artifact of longer than ideal leader election 
> > latency.
> >

You can kind of do this. That is you can mimic the 0.7 behavior by having a 
partitioner that only partitions to available partitions irrespective of key. 
Since there are no retries yet I think this is effectively as good as it gets.

I am a little averse to the idea of some kind of -1 partition that is handled 
magically by the producer but that is also an option.


> On Jan. 28, 2014, 8:48 p.m., Joel Koshy wrote:
> > clients/src/main/java/kafka/clients/producer/internals/BufferPool.java, 
> > line 21
> > <https://reviews.apache.org/r/17263/diff/1/?file=436452#file436452line21>
> >
> >     My reading of bufferPool - let me know if this is incorrect:
> >     - For most producers, most allocations will be of batch-size. (And most
> >       batches will not entirely fill the batch-size buffer).
> >     - Batch size needs to be properly tuned:
> >       - If too low, then you end up having (typically) one of the following:
> >         - Messages exceed batch size and thus poolable size and so you end 
> > up
> >           having to allocate on every record.
> >         - Messages fit but you have more sends than (probably) necessary 
> > given
> >           that you are sending smaller requests
> >       - If too high, then you could end up wasting a lot of memory 
> > especially
> >         for producers such as the mirror maker which produces to several
> >         partitions.
> >             
> >     It is an interesting approach, and I will think about it more. I would
> >     expect it to work very well for 90% of producers that send to a few 
> > topics,
> >     but it may be problematic for large producers such as mirror makers for
> >     which my intuition is that even very simple memory banks (i.e., a few 
> > levels
> >     of free lists) would address without fully reimplementing malloc. E.g., 
> > for
> >     each free list if we were to maintain counts on percentage of
> >     smaller-than-half allocations we could if necessary create a free list 
> > of
> >     smaller poolable sizes and keep a global cap on the number of free 
> > lists.
> >

Yeah I originally shared a lot of your concerns, but consider a few additional 
things:
1. I don't think that for large messages doing an allocation and send 
per-message is a problem because the sending of bytes will dominate. i.e. I 
think we could hard code a batch size of 16k and it would be close to optimal 
for most producers. Basically the goal is to avoid small allocations/sends not 
to avoid per-message allocations/sends.
2. The memory fragmentation problem is not as bad as it seems. Keep in mind 
that each java object significant overhead. Keeping around a list of 
ProducerRecord instances is actually pretty pricy if you sit and add up the 
actual per message overheads. I think also internally malloc/new must have 
fragmentation, that is when you ask for 15 bytes you often get more than 15 
bytes. I'm not convinced this strategy is that much worse.
3. One optimization that Neha pointed out that would improve things would be to 
take all ready batches for a partition. I think this could be added later.
4. With 16k batch size and 1000 partitions you only need 16M of buffer space 
which is not unreasonable so I think this may work even for mirror-maker like 
cases.

That said if you have some thoughts on how the pooling could work I would be 
interested to hear it. I considered something like what you describe where I 
pick a few sizes to pool at, but it definitely adds some complications and 
corner cases so the challenge is to algorithmically work through some of these.


> On Jan. 28, 2014, 8:48 p.m., Joel Koshy wrote:
> > clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java,
> >  line 150
> > <https://reviews.apache.org/r/17263/diff/1/?file=436455#file436455line150>
> >
> >     Another simple approach is to just attach the last send time to each
> >     topic-partition and sort this in increasing order of last send time 
> > although
> >     a more ideal heuristic would be to also take into account the actual 
> > data
> >     available.
> >

Yeah this is interesting. What I interpret you to be saying is to sort all the 
batches by the created timestamp on the record batch which corresponds to the 
first message enqueue.

The advantage of this is that instead of round-robining in the check always 
chose the partition which has the oldest data.

Avoiding the sort on each iteration might require some kind of heap.

This would be a good optimization.


> On Jan. 28, 2014, 8:48 p.m., Joel Koshy wrote:
> > clients/src/main/java/kafka/clients/producer/internals/RecordBatch.java, 
> > line 81
> > <https://reviews.apache.org/r/17263/diff/1/?file=436456#file436456line81>
> >
> >     Would it be clearer to a user to have an "error callback" vs. a single
> >     callback? i.e., a user may want different handling for either scenario 
> > and
> >     has to "remember" to check hasError if there is only one callback. This
> >     probably folds into the API discussion which I have not yet looked over 
> > in
> >     great detail.
> >

The error will be thrown by await() or any accessor method on the send. There 
was some discussion on this as part of the api discussion...


> On Jan. 28, 2014, 8:48 p.m., Joel Koshy wrote:
> > clients/src/main/java/kafka/clients/producer/internals/Sender.java, line 299
> > <https://reviews.apache.org/r/17263/diff/1/?file=436457#file436457line299>
> >
> >     On retryable errors we should probably force a metadata refresh. This is
> >     separate from retries (which I don't see implemented in this patch). Are
> >     retries going to be in-built or is the client expected to handle 
> > retries?
> >     
> >

I don't think we have decided about retries.

This is a good point about re-fetching metadata. I don't believe I fully 
understand when metadata refetch should happen.


> On Jan. 28, 2014, 8:48 p.m., Joel Koshy wrote:
> > clients/src/main/java/kafka/clients/producer/internals/Sender.java, line 448
> > <https://reviews.apache.org/r/17263/diff/1/?file=436457#file436457line448>
> >
> >     Why do we need a deque for inflight requests? Given that there can be at
> >     most one outstanding request a reference should have been sufficient - 
> > no?
> >

There can be only one request being written at any time but there can be any 
number of in-flight requests.


> On Jan. 28, 2014, 8:48 p.m., Joel Koshy wrote:
> > clients/src/main/java/kafka/common/Node.java, line 41
> > <https://reviews.apache.org/r/17263/diff/1/?file=436465#file436465line41>
> >
> >     Can cache the result (as you do for TopicPartition).
> >

Agreed.


> On Jan. 28, 2014, 8:48 p.m., Joel Koshy wrote:
> > clients/src/main/java/kafka/common/StringSerialization.java, line 40
> > <https://reviews.apache.org/r/17263/diff/1/?file=436468#file436468line40>
> >
> >     What is the motivation for wrapping encoding exceptions with 
> > KafkaException?
> >

It is a checked exception. I guess the question is whether the serializer 
should be able to throw checked exceptions and then what we should do with 
those given that send doesn't.


> On Jan. 28, 2014, 8:48 p.m., Joel Koshy wrote:
> > clients/src/main/java/kafka/common/config/AbstractConfig.java, line 67
> > <https://reviews.apache.org/r/17263/diff/1/?file=436470#file436470line67>
> >
> >     ... is currently unused :) Assuming you will incorporate this in the 
> > final
> >     patch.
> >

Yeah...


> On Jan. 28, 2014, 8:48 p.m., Joel Koshy wrote:
> > clients/src/main/java/kafka/common/config/ConfigException.java, line 10
> > <https://reviews.apache.org/r/17263/diff/1/?file=436472#file436472line10>
> >
> >     How will these UIDs be used?
> >

Basically Exception is a serializable class so you get a warning if you don't 
have a serialVersionUID. I sincerely hope no one will serialize our exceptions.


> On Jan. 28, 2014, 8:48 p.m., Joel Koshy wrote:
> > clients/src/main/java/kafka/common/network/NetworkReceive.java, line 17
> > <https://reviews.apache.org/r/17263/diff/1/?file=436510#file436510line17>
> >
> >     Seems to be an unnecessary constructor.

I was using that in the unit tests to fake responses.


> On Jan. 28, 2014, 8:48 p.m., Joel Koshy wrote:
> > clients/src/main/java/kafka/common/network/Selector.java, line 102
> > <https://reviews.apache.org/r/17263/diff/1/?file=436514#file436514line102>
> >
> >     Shouldn't we check earlier in this method if we have an established
> >     connection?
> >     
> >     Also, the containsKey check should take the id.
> >

Good point.


> On Jan. 28, 2014, 8:48 p.m., Joel Koshy wrote:
> > clients/src/main/java/kafka/common/network/Send.java, line 30
> > <https://reviews.apache.org/r/17263/diff/1/?file=436515#file436515line30>
> >
> >     Not sure what the use-case for this would be.

It is sort of a complicated and unfinished thought. Basically if you want to 
directly carry the ByteBuffers in the request directly through to the network 
send without recopying you need a way to try to reach around the abstraction. 
I'll wrap that up and either use and explain that or remove it.


- Jay


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17263/#review33033
-----------------------------------------------------------


On Jan. 23, 2014, 8:54 p.m., Jay Kreps wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/17263/
> -----------------------------------------------------------
> 
> (Updated Jan. 23, 2014, 8:54 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1227
>     https://issues.apache.org/jira/browse/KAFKA-1227
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1227 New producer!
> 
> 
> Diffs
> -----
> 
>   clients/build.sbt PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/BufferExhaustedException.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/Callback.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/DefaultPartitioner.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/KafkaProducer.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/MockProducer.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/Partitioner.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/Producer.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/ProducerConfig.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/ProducerRecord.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/RecordSend.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/BufferPool.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/Metadata.java 
> PRE-CREATION 
>   
> clients/src/main/java/kafka/clients/producer/internals/ProduceRequestResult.java
>  PRE-CREATION 
>   
> clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/RecordBatch.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/Sender.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/clients/tools/ProducerPerformance.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/ByteSerialization.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Cluster.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Configurable.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Deserializer.java PRE-CREATION 
>   clients/src/main/java/kafka/common/KafkaException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Metric.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Node.java PRE-CREATION 
>   clients/src/main/java/kafka/common/PartitionInfo.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Serializer.java PRE-CREATION 
>   clients/src/main/java/kafka/common/StringSerialization.java PRE-CREATION 
>   clients/src/main/java/kafka/common/TopicPartition.java PRE-CREATION 
>   clients/src/main/java/kafka/common/config/AbstractConfig.java PRE-CREATION 
>   clients/src/main/java/kafka/common/config/ConfigDef.java PRE-CREATION 
>   clients/src/main/java/kafka/common/config/ConfigException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/ApiException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/CorruptMessageException.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/LeaderNotAvailableException.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/MessageTooLargeException.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/NetworkException.java 
> PRE-CREATION 
>   
> clients/src/main/java/kafka/common/errors/NotLeaderForPartitionException.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/OffsetMetadataTooLarge.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/OffsetOutOfRangeException.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/RetryableException.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/TimeoutException.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/UnknownServerException.java 
> PRE-CREATION 
>   
> clients/src/main/java/kafka/common/errors/UnknownTopicOrPartitionException.java
>  PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/CompoundStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/JmxReporter.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/KafkaMetric.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Measurable.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/MeasurableStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/MetricConfig.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Metrics.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/MetricsReporter.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Quota.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/QuotaViolationException.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Sensor.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Stat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Avg.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Count.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Histogram.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Max.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Min.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Percentile.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Percentiles.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Rate.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/SampledStat.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Total.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/WindowedStat.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/network/ByteBufferReceive.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/network/ByteBufferSend.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/NetworkReceive.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/NetworkSend.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Receive.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Selectable.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Selector.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Send.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/ApiKeys.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/Errors.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/ProtoUtils.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/Protocol.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/ArrayOf.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Field.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Schema.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/SchemaException.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Struct.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Type.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/CompressionType.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/InvalidRecordException.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/record/LogEntry.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/MemoryRecords.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/Record.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/Records.java PRE-CREATION 
>   clients/src/main/java/kafka/common/requests/RequestHeader.java PRE-CREATION 
>   clients/src/main/java/kafka/common/requests/RequestSend.java PRE-CREATION 
>   clients/src/main/java/kafka/common/requests/ResponseHeader.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/AbstractIterator.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/CopyOnWriteMap.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/Crc32.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/KafkaThread.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/SystemTime.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/Time.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/Utils.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/common/network/SelectorTest.java 
> PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/BufferPoolTest.java 
> PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/MetadataTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/MockProducerTest.java 
> PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/RecordAccumulatorTest.java 
> PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/RecordSendTest.java 
> PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/SenderTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/config/ConfigDefTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/metrics/JmxReporterTest.java 
> PRE-CREATION 
>   clients/src/test/java/kafka/common/metrics/MetricsTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/metrics/stats/HistogramTest.java 
> PRE-CREATION 
>   
> clients/src/test/java/kafka/common/protocol/types/ProtocolSerializationTest.java
>  PRE-CREATION 
>   clients/src/test/java/kafka/common/record/MemoryRecordsTest.java 
> PRE-CREATION 
>   clients/src/test/java/kafka/common/record/RecordTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/utils/AbstractIteratorTest.java 
> PRE-CREATION 
>   clients/src/test/java/kafka/common/utils/MockTime.java PRE-CREATION 
>   clients/src/test/java/kafka/test/MetricsBench.java PRE-CREATION 
>   clients/src/test/java/kafka/test/Microbenchmarks.java PRE-CREATION 
>   clients/src/test/java/kafka/test/MockSelector.java PRE-CREATION 
>   clients/src/test/java/kafka/test/TestUtils.java PRE-CREATION 
>   project/Build.scala 098e874b8a14067739e6cd1f0a9ab9597ae84b6e 
> 
> Diff: https://reviews.apache.org/r/17263/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jay Kreps
> 
>

Reply via email to