> On Jan. 26, 2014, 11:30 p.m., Sriram Subramanian wrote:
> > clients/src/main/java/kafka/clients/producer/Callback.java, line 14
> > <https://reviews.apache.org/r/17263/diff/1/?file=436443#file436443line14>
> >
> >     If the caller implements onCompletion to block for a while, the io 
> > thread would be blocked. Should this be mentioned somewhere in the 
> > interface?

I actually did mention that in the javadoc on that interface as well as in the 
producer javadoc. I considered the possibility of some kind of special thread 
pool for these but I think in most cases that would be overkill.


> On Jan. 26, 2014, 11:30 p.m., Sriram Subramanian wrote:
> > clients/src/main/java/kafka/clients/producer/KafkaProducer.java, line 150
> > <https://reviews.apache.org/r/17263/diff/1/?file=436445#file436445line150>
> >
> >     I think what you meant here when you said "the callback will execute in 
> > the I/O thread of the producer and so should be reasonably fast." is that 
> > the callback should be implemented such that it returns immediately. This 
> > is not very obvious from the current description.

Hmm, you don't think so? I will specifically mention blocking.


> On Jan. 26, 2014, 11:30 p.m., Sriram Subramanian wrote:
> > clients/src/main/java/kafka/clients/producer/KafkaProducer.java, line 216
> > <https://reviews.apache.org/r/17263/diff/1/?file=436445#file436445line216>
> >
> >     I am not a big fan of this api. I started on these lines for another 
> > project and it soon made me reimplement most of codahale features which 
> > really is not worth it. I understand the reasoning behind this (not have 
> > dependency on external libraries and have any reporters written) but it 
> > becomes a pain to maintain, reimplement and stabilize. Also, clients would 
> > now have to understand how to use this data to implement metrics. Codahale 
> > already supports quite a few reporters. An alternative to the current 
> > approach is to take a list of reporters from the config and instantiate 
> > them in code using codahale reporters. The project is very active and have 
> > done quite a few optimizations recently which are useful. We could end up 
> > being handicapped because we may not have the right metrics and only way to 
> > get it is to implement something on our own.

Yeah I think the metrics stuff is open to the accusation of reinventing the 
wheel. I will start a thread on this.


> On Jan. 26, 2014, 11:30 p.m., Sriram Subramanian wrote:
> > clients/src/main/java/kafka/clients/producer/RecordSend.java, line 34
> > <https://reviews.apache.org/r/17263/diff/1/?file=436451#file436451line34>
> >
> >     This seems a bit confusing. It would be useful to keep the usage of 
> > RecordSend in the callback and sync producer to be the same. Is there any 
> > reason that await throws on error when you have apis to check error?

I think I am confused. The usage in callback and directly is the same, no?

The question of whether to have await() throw an error or not is good. I think 
that is how futures in java work. It also forces the error versus making it 
possible to use the offset when there is an error. Basically I think I wanted 
to avoid the mongodb getLastError() where errors are silent unless you 
explicitly look for them.


> On Jan. 26, 2014, 11:30 p.m., Sriram Subramanian wrote:
> > clients/src/main/java/kafka/clients/producer/internals/BufferPool.java, 
> > line 29
> > <https://reviews.apache.org/r/17263/diff/1/?file=436452#file436452line29>
> >
> >     A little more information on how availableMemory and free list are used 
> > would be useful to comment about.

Agreed.


> On Jan. 26, 2014, 11:30 p.m., Sriram Subramanian wrote:
> > clients/src/main/java/kafka/clients/producer/internals/BufferPool.java, 
> > line 79
> > <https://reviews.apache.org/r/17263/diff/1/?file=436452#file436452line79>
> >
> >     Is this exception safe?

Hmm, I think what you are saying is that interrupting the thread may cause it 
to leak memory?


> On Jan. 26, 2014, 11:30 p.m., Sriram Subramanian wrote:
> > clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java,
> >  line 87
> > <https://reviews.apache.org/r/17263/diff/1/?file=436455#file436455line87>
> >
> >     We can allocate a batch larger than the batchSize?

Yeah. This isn't terribly self-explanatory. The idea here is that you want to 
encourage medium sized batches to avoid small sends. So maybe you set a batch 
size of 4k. If you get a 1k message, you allocate a 4k buffer and append it and 
hope more messages show up. If you get a large message you just allocate a 
buffer for it and send it right away. This let's us have no artificial bound on 
message size, but handle small messages efficiently.


> On Jan. 26, 2014, 11:30 p.m., Sriram Subramanian wrote:
> > clients/src/main/java/kafka/clients/producer/internals/BufferPool.java, 
> > line 80
> > <https://reviews.apache.org/r/17263/diff/1/?file=436452#file436452line80>
> >
> >     What happens if freeUp actually got memory from the free list? We still 
> > seem to create a new buffer with the given size. Why are we not returning 
> > from the pool? It looks like availableMemory can go negative here.

Let me know if this is just confusion about how the pool works. The idea is 
that I want to optimize the common case of allocations that match the batch 
size, but I still want to allow large allocations of any size. I also want to 
avoid implementing malloc, as you rightly pointed out. The compromise I came up 
with was to special case the batch size and pool that size since that should be 
99% of the allocations, when messages come in that are bigger then this i just 
directly allocate memory from them and don't attempt to pool it.

But this means you can have a bunch of buffers in the free list (which are 
always exactly batchSize) that use up all available memory. So if you get a 
custom allocation size you need to toss out some pooled buffers to ensure you 
don't exceed your memory limit. Let me know if that makes sense and isn't 
totally insane.


> On Jan. 26, 2014, 11:30 p.m., Sriram Subramanian wrote:
> > clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java,
> >  line 108
> > <https://reviews.apache.org/r/17263/diff/1/?file=436455#file436455line108>
> >
> >     ready to be sent

fixed


> On Jan. 26, 2014, 11:30 p.m., Sriram Subramanian wrote:
> > clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java,
> >  line 145
> > <https://reviews.apache.org/r/17263/diff/1/?file=436455#file436455line145>
> >
> >     I am assuming this method is called only by a single thread and hence 
> > there is no requirement to synchronize here

Yes, the ready()/drain() interface only works if you have a single I/O thread, 
which I do. Basically I don't really want to have a pool of I/O threads unless 
we have to.

The weird use of drainIndex in that method is because I want to kind of 
randomize which partitions get chosen if they all have tons of data. I don't 
want it to be the case that I get the whole max request size worth of data from 
a single partition every time starving the other partitions.


- Jay


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17263/#review32800
-----------------------------------------------------------


On Jan. 23, 2014, 8:54 p.m., Jay Kreps wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/17263/
> -----------------------------------------------------------
> 
> (Updated Jan. 23, 2014, 8:54 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1227
>     https://issues.apache.org/jira/browse/KAFKA-1227
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1227 New producer!
> 
> 
> Diffs
> -----
> 
>   clients/build.sbt PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/BufferExhaustedException.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/Callback.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/DefaultPartitioner.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/KafkaProducer.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/MockProducer.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/Partitioner.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/Producer.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/ProducerConfig.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/ProducerRecord.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/RecordSend.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/BufferPool.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/Metadata.java 
> PRE-CREATION 
>   
> clients/src/main/java/kafka/clients/producer/internals/ProduceRequestResult.java
>  PRE-CREATION 
>   
> clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/RecordBatch.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/Sender.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/clients/tools/ProducerPerformance.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/ByteSerialization.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Cluster.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Configurable.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Deserializer.java PRE-CREATION 
>   clients/src/main/java/kafka/common/KafkaException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Metric.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Node.java PRE-CREATION 
>   clients/src/main/java/kafka/common/PartitionInfo.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Serializer.java PRE-CREATION 
>   clients/src/main/java/kafka/common/StringSerialization.java PRE-CREATION 
>   clients/src/main/java/kafka/common/TopicPartition.java PRE-CREATION 
>   clients/src/main/java/kafka/common/config/AbstractConfig.java PRE-CREATION 
>   clients/src/main/java/kafka/common/config/ConfigDef.java PRE-CREATION 
>   clients/src/main/java/kafka/common/config/ConfigException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/ApiException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/CorruptMessageException.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/LeaderNotAvailableException.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/MessageTooLargeException.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/NetworkException.java 
> PRE-CREATION 
>   
> clients/src/main/java/kafka/common/errors/NotLeaderForPartitionException.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/OffsetMetadataTooLarge.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/OffsetOutOfRangeException.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/RetryableException.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/TimeoutException.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/UnknownServerException.java 
> PRE-CREATION 
>   
> clients/src/main/java/kafka/common/errors/UnknownTopicOrPartitionException.java
>  PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/CompoundStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/JmxReporter.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/KafkaMetric.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Measurable.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/MeasurableStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/MetricConfig.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Metrics.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/MetricsReporter.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Quota.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/QuotaViolationException.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Sensor.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Stat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Avg.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Count.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Histogram.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Max.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Min.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Percentile.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Percentiles.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Rate.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/SampledStat.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Total.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/WindowedStat.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/network/ByteBufferReceive.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/network/ByteBufferSend.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/NetworkReceive.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/NetworkSend.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Receive.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Selectable.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Selector.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Send.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/ApiKeys.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/Errors.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/ProtoUtils.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/Protocol.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/ArrayOf.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Field.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Schema.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/SchemaException.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Struct.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Type.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/CompressionType.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/InvalidRecordException.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/record/LogEntry.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/MemoryRecords.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/Record.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/Records.java PRE-CREATION 
>   clients/src/main/java/kafka/common/requests/RequestHeader.java PRE-CREATION 
>   clients/src/main/java/kafka/common/requests/RequestSend.java PRE-CREATION 
>   clients/src/main/java/kafka/common/requests/ResponseHeader.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/AbstractIterator.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/CopyOnWriteMap.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/Crc32.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/KafkaThread.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/SystemTime.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/Time.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/Utils.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/common/network/SelectorTest.java 
> PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/BufferPoolTest.java 
> PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/MetadataTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/MockProducerTest.java 
> PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/RecordAccumulatorTest.java 
> PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/RecordSendTest.java 
> PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/SenderTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/config/ConfigDefTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/metrics/JmxReporterTest.java 
> PRE-CREATION 
>   clients/src/test/java/kafka/common/metrics/MetricsTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/metrics/stats/HistogramTest.java 
> PRE-CREATION 
>   
> clients/src/test/java/kafka/common/protocol/types/ProtocolSerializationTest.java
>  PRE-CREATION 
>   clients/src/test/java/kafka/common/record/MemoryRecordsTest.java 
> PRE-CREATION 
>   clients/src/test/java/kafka/common/record/RecordTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/utils/AbstractIteratorTest.java 
> PRE-CREATION 
>   clients/src/test/java/kafka/common/utils/MockTime.java PRE-CREATION 
>   clients/src/test/java/kafka/test/MetricsBench.java PRE-CREATION 
>   clients/src/test/java/kafka/test/Microbenchmarks.java PRE-CREATION 
>   clients/src/test/java/kafka/test/MockSelector.java PRE-CREATION 
>   clients/src/test/java/kafka/test/TestUtils.java PRE-CREATION 
>   project/Build.scala 098e874b8a14067739e6cd1f0a9ab9597ae84b6e 
> 
> Diff: https://reviews.apache.org/r/17263/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jay Kreps
> 
>

Reply via email to