> On Jan. 26, 2014, 11:30 p.m., Sriram Subramanian wrote: > > clients/src/main/java/kafka/clients/producer/Callback.java, line 14 > > <https://reviews.apache.org/r/17263/diff/1/?file=436443#file436443line14> > > > > If the caller implements onCompletion to block for a while, the io > > thread would be blocked. Should this be mentioned somewhere in the > > interface?
I actually did mention that in the javadoc on that interface as well as in the producer javadoc. I considered the possibility of some kind of special thread pool for these but I think in most cases that would be overkill. > On Jan. 26, 2014, 11:30 p.m., Sriram Subramanian wrote: > > clients/src/main/java/kafka/clients/producer/KafkaProducer.java, line 150 > > <https://reviews.apache.org/r/17263/diff/1/?file=436445#file436445line150> > > > > I think what you meant here when you said "the callback will execute in > > the I/O thread of the producer and so should be reasonably fast." is that > > the callback should be implemented such that it returns immediately. This > > is not very obvious from the current description. Hmm, you don't think so? I will specifically mention blocking. > On Jan. 26, 2014, 11:30 p.m., Sriram Subramanian wrote: > > clients/src/main/java/kafka/clients/producer/KafkaProducer.java, line 216 > > <https://reviews.apache.org/r/17263/diff/1/?file=436445#file436445line216> > > > > I am not a big fan of this api. I started on these lines for another > > project and it soon made me reimplement most of codahale features which > > really is not worth it. I understand the reasoning behind this (not have > > dependency on external libraries and have any reporters written) but it > > becomes a pain to maintain, reimplement and stabilize. Also, clients would > > now have to understand how to use this data to implement metrics. Codahale > > already supports quite a few reporters. An alternative to the current > > approach is to take a list of reporters from the config and instantiate > > them in code using codahale reporters. The project is very active and have > > done quite a few optimizations recently which are useful. We could end up > > being handicapped because we may not have the right metrics and only way to > > get it is to implement something on our own. Yeah I think the metrics stuff is open to the accusation of reinventing the wheel. I will start a thread on this. > On Jan. 26, 2014, 11:30 p.m., Sriram Subramanian wrote: > > clients/src/main/java/kafka/clients/producer/RecordSend.java, line 34 > > <https://reviews.apache.org/r/17263/diff/1/?file=436451#file436451line34> > > > > This seems a bit confusing. It would be useful to keep the usage of > > RecordSend in the callback and sync producer to be the same. Is there any > > reason that await throws on error when you have apis to check error? I think I am confused. The usage in callback and directly is the same, no? The question of whether to have await() throw an error or not is good. I think that is how futures in java work. It also forces the error versus making it possible to use the offset when there is an error. Basically I think I wanted to avoid the mongodb getLastError() where errors are silent unless you explicitly look for them. > On Jan. 26, 2014, 11:30 p.m., Sriram Subramanian wrote: > > clients/src/main/java/kafka/clients/producer/internals/BufferPool.java, > > line 29 > > <https://reviews.apache.org/r/17263/diff/1/?file=436452#file436452line29> > > > > A little more information on how availableMemory and free list are used > > would be useful to comment about. Agreed. > On Jan. 26, 2014, 11:30 p.m., Sriram Subramanian wrote: > > clients/src/main/java/kafka/clients/producer/internals/BufferPool.java, > > line 79 > > <https://reviews.apache.org/r/17263/diff/1/?file=436452#file436452line79> > > > > Is this exception safe? Hmm, I think what you are saying is that interrupting the thread may cause it to leak memory? > On Jan. 26, 2014, 11:30 p.m., Sriram Subramanian wrote: > > clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java, > > line 87 > > <https://reviews.apache.org/r/17263/diff/1/?file=436455#file436455line87> > > > > We can allocate a batch larger than the batchSize? Yeah. This isn't terribly self-explanatory. The idea here is that you want to encourage medium sized batches to avoid small sends. So maybe you set a batch size of 4k. If you get a 1k message, you allocate a 4k buffer and append it and hope more messages show up. If you get a large message you just allocate a buffer for it and send it right away. This let's us have no artificial bound on message size, but handle small messages efficiently. > On Jan. 26, 2014, 11:30 p.m., Sriram Subramanian wrote: > > clients/src/main/java/kafka/clients/producer/internals/BufferPool.java, > > line 80 > > <https://reviews.apache.org/r/17263/diff/1/?file=436452#file436452line80> > > > > What happens if freeUp actually got memory from the free list? We still > > seem to create a new buffer with the given size. Why are we not returning > > from the pool? It looks like availableMemory can go negative here. Let me know if this is just confusion about how the pool works. The idea is that I want to optimize the common case of allocations that match the batch size, but I still want to allow large allocations of any size. I also want to avoid implementing malloc, as you rightly pointed out. The compromise I came up with was to special case the batch size and pool that size since that should be 99% of the allocations, when messages come in that are bigger then this i just directly allocate memory from them and don't attempt to pool it. But this means you can have a bunch of buffers in the free list (which are always exactly batchSize) that use up all available memory. So if you get a custom allocation size you need to toss out some pooled buffers to ensure you don't exceed your memory limit. Let me know if that makes sense and isn't totally insane. > On Jan. 26, 2014, 11:30 p.m., Sriram Subramanian wrote: > > clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java, > > line 108 > > <https://reviews.apache.org/r/17263/diff/1/?file=436455#file436455line108> > > > > ready to be sent fixed > On Jan. 26, 2014, 11:30 p.m., Sriram Subramanian wrote: > > clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java, > > line 145 > > <https://reviews.apache.org/r/17263/diff/1/?file=436455#file436455line145> > > > > I am assuming this method is called only by a single thread and hence > > there is no requirement to synchronize here Yes, the ready()/drain() interface only works if you have a single I/O thread, which I do. Basically I don't really want to have a pool of I/O threads unless we have to. The weird use of drainIndex in that method is because I want to kind of randomize which partitions get chosen if they all have tons of data. I don't want it to be the case that I get the whole max request size worth of data from a single partition every time starving the other partitions. - Jay ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/17263/#review32800 ----------------------------------------------------------- On Jan. 23, 2014, 8:54 p.m., Jay Kreps wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/17263/ > ----------------------------------------------------------- > > (Updated Jan. 23, 2014, 8:54 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1227 > https://issues.apache.org/jira/browse/KAFKA-1227 > > > Repository: kafka > > > Description > ------- > > KAFKA-1227 New producer! > > > Diffs > ----- > > clients/build.sbt PRE-CREATION > clients/src/main/java/kafka/clients/producer/BufferExhaustedException.java > PRE-CREATION > clients/src/main/java/kafka/clients/producer/Callback.java PRE-CREATION > clients/src/main/java/kafka/clients/producer/DefaultPartitioner.java > PRE-CREATION > clients/src/main/java/kafka/clients/producer/KafkaProducer.java > PRE-CREATION > clients/src/main/java/kafka/clients/producer/MockProducer.java PRE-CREATION > clients/src/main/java/kafka/clients/producer/Partitioner.java PRE-CREATION > clients/src/main/java/kafka/clients/producer/Producer.java PRE-CREATION > clients/src/main/java/kafka/clients/producer/ProducerConfig.java > PRE-CREATION > clients/src/main/java/kafka/clients/producer/ProducerRecord.java > PRE-CREATION > clients/src/main/java/kafka/clients/producer/RecordSend.java PRE-CREATION > clients/src/main/java/kafka/clients/producer/internals/BufferPool.java > PRE-CREATION > clients/src/main/java/kafka/clients/producer/internals/Metadata.java > PRE-CREATION > > clients/src/main/java/kafka/clients/producer/internals/ProduceRequestResult.java > PRE-CREATION > > clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java > PRE-CREATION > clients/src/main/java/kafka/clients/producer/internals/RecordBatch.java > PRE-CREATION > clients/src/main/java/kafka/clients/producer/internals/Sender.java > PRE-CREATION > clients/src/main/java/kafka/clients/tools/ProducerPerformance.java > PRE-CREATION > clients/src/main/java/kafka/common/ByteSerialization.java PRE-CREATION > clients/src/main/java/kafka/common/Cluster.java PRE-CREATION > clients/src/main/java/kafka/common/Configurable.java PRE-CREATION > clients/src/main/java/kafka/common/Deserializer.java PRE-CREATION > clients/src/main/java/kafka/common/KafkaException.java PRE-CREATION > clients/src/main/java/kafka/common/Metric.java PRE-CREATION > clients/src/main/java/kafka/common/Node.java PRE-CREATION > clients/src/main/java/kafka/common/PartitionInfo.java PRE-CREATION > clients/src/main/java/kafka/common/Serializer.java PRE-CREATION > clients/src/main/java/kafka/common/StringSerialization.java PRE-CREATION > clients/src/main/java/kafka/common/TopicPartition.java PRE-CREATION > clients/src/main/java/kafka/common/config/AbstractConfig.java PRE-CREATION > clients/src/main/java/kafka/common/config/ConfigDef.java PRE-CREATION > clients/src/main/java/kafka/common/config/ConfigException.java PRE-CREATION > clients/src/main/java/kafka/common/errors/ApiException.java PRE-CREATION > clients/src/main/java/kafka/common/errors/CorruptMessageException.java > PRE-CREATION > clients/src/main/java/kafka/common/errors/LeaderNotAvailableException.java > PRE-CREATION > clients/src/main/java/kafka/common/errors/MessageTooLargeException.java > PRE-CREATION > clients/src/main/java/kafka/common/errors/NetworkException.java > PRE-CREATION > > clients/src/main/java/kafka/common/errors/NotLeaderForPartitionException.java > PRE-CREATION > clients/src/main/java/kafka/common/errors/OffsetMetadataTooLarge.java > PRE-CREATION > clients/src/main/java/kafka/common/errors/OffsetOutOfRangeException.java > PRE-CREATION > clients/src/main/java/kafka/common/errors/RetryableException.java > PRE-CREATION > clients/src/main/java/kafka/common/errors/TimeoutException.java > PRE-CREATION > clients/src/main/java/kafka/common/errors/UnknownServerException.java > PRE-CREATION > > clients/src/main/java/kafka/common/errors/UnknownTopicOrPartitionException.java > PRE-CREATION > clients/src/main/java/kafka/common/metrics/CompoundStat.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/JmxReporter.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/KafkaMetric.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/Measurable.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/MeasurableStat.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/MetricConfig.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/Metrics.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/MetricsReporter.java > PRE-CREATION > clients/src/main/java/kafka/common/metrics/Quota.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/QuotaViolationException.java > PRE-CREATION > clients/src/main/java/kafka/common/metrics/Sensor.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/Stat.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/Avg.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/Count.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/Histogram.java > PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/Max.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/Min.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/Percentile.java > PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/Percentiles.java > PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/Rate.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/SampledStat.java > PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/Total.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/WindowedStat.java > PRE-CREATION > clients/src/main/java/kafka/common/network/ByteBufferReceive.java > PRE-CREATION > clients/src/main/java/kafka/common/network/ByteBufferSend.java PRE-CREATION > clients/src/main/java/kafka/common/network/NetworkReceive.java PRE-CREATION > clients/src/main/java/kafka/common/network/NetworkSend.java PRE-CREATION > clients/src/main/java/kafka/common/network/Receive.java PRE-CREATION > clients/src/main/java/kafka/common/network/Selectable.java PRE-CREATION > clients/src/main/java/kafka/common/network/Selector.java PRE-CREATION > clients/src/main/java/kafka/common/network/Send.java PRE-CREATION > clients/src/main/java/kafka/common/protocol/ApiKeys.java PRE-CREATION > clients/src/main/java/kafka/common/protocol/Errors.java PRE-CREATION > clients/src/main/java/kafka/common/protocol/ProtoUtils.java PRE-CREATION > clients/src/main/java/kafka/common/protocol/Protocol.java PRE-CREATION > clients/src/main/java/kafka/common/protocol/types/ArrayOf.java PRE-CREATION > clients/src/main/java/kafka/common/protocol/types/Field.java PRE-CREATION > clients/src/main/java/kafka/common/protocol/types/Schema.java PRE-CREATION > clients/src/main/java/kafka/common/protocol/types/SchemaException.java > PRE-CREATION > clients/src/main/java/kafka/common/protocol/types/Struct.java PRE-CREATION > clients/src/main/java/kafka/common/protocol/types/Type.java PRE-CREATION > clients/src/main/java/kafka/common/record/CompressionType.java PRE-CREATION > clients/src/main/java/kafka/common/record/InvalidRecordException.java > PRE-CREATION > clients/src/main/java/kafka/common/record/LogEntry.java PRE-CREATION > clients/src/main/java/kafka/common/record/MemoryRecords.java PRE-CREATION > clients/src/main/java/kafka/common/record/Record.java PRE-CREATION > clients/src/main/java/kafka/common/record/Records.java PRE-CREATION > clients/src/main/java/kafka/common/requests/RequestHeader.java PRE-CREATION > clients/src/main/java/kafka/common/requests/RequestSend.java PRE-CREATION > clients/src/main/java/kafka/common/requests/ResponseHeader.java > PRE-CREATION > clients/src/main/java/kafka/common/utils/AbstractIterator.java PRE-CREATION > clients/src/main/java/kafka/common/utils/CopyOnWriteMap.java PRE-CREATION > clients/src/main/java/kafka/common/utils/Crc32.java PRE-CREATION > clients/src/main/java/kafka/common/utils/KafkaThread.java PRE-CREATION > clients/src/main/java/kafka/common/utils/SystemTime.java PRE-CREATION > clients/src/main/java/kafka/common/utils/Time.java PRE-CREATION > clients/src/main/java/kafka/common/utils/Utils.java PRE-CREATION > clients/src/test/java/kafka/clients/common/network/SelectorTest.java > PRE-CREATION > clients/src/test/java/kafka/clients/producer/BufferPoolTest.java > PRE-CREATION > clients/src/test/java/kafka/clients/producer/MetadataTest.java PRE-CREATION > clients/src/test/java/kafka/clients/producer/MockProducerTest.java > PRE-CREATION > clients/src/test/java/kafka/clients/producer/RecordAccumulatorTest.java > PRE-CREATION > clients/src/test/java/kafka/clients/producer/RecordSendTest.java > PRE-CREATION > clients/src/test/java/kafka/clients/producer/SenderTest.java PRE-CREATION > clients/src/test/java/kafka/common/config/ConfigDefTest.java PRE-CREATION > clients/src/test/java/kafka/common/metrics/JmxReporterTest.java > PRE-CREATION > clients/src/test/java/kafka/common/metrics/MetricsTest.java PRE-CREATION > clients/src/test/java/kafka/common/metrics/stats/HistogramTest.java > PRE-CREATION > > clients/src/test/java/kafka/common/protocol/types/ProtocolSerializationTest.java > PRE-CREATION > clients/src/test/java/kafka/common/record/MemoryRecordsTest.java > PRE-CREATION > clients/src/test/java/kafka/common/record/RecordTest.java PRE-CREATION > clients/src/test/java/kafka/common/utils/AbstractIteratorTest.java > PRE-CREATION > clients/src/test/java/kafka/common/utils/MockTime.java PRE-CREATION > clients/src/test/java/kafka/test/MetricsBench.java PRE-CREATION > clients/src/test/java/kafka/test/Microbenchmarks.java PRE-CREATION > clients/src/test/java/kafka/test/MockSelector.java PRE-CREATION > clients/src/test/java/kafka/test/TestUtils.java PRE-CREATION > project/Build.scala 098e874b8a14067739e6cd1f0a9ab9597ae84b6e > > Diff: https://reviews.apache.org/r/17263/diff/ > > > Testing > ------- > > > Thanks, > > Jay Kreps > >