> On Jan. 26, 2014, 11:30 p.m., Sriram Subramanian wrote: > > clients/src/main/java/kafka/clients/producer/internals/BufferPool.java, > > line 80 > > <https://reviews.apache.org/r/17263/diff/1/?file=436452#file436452line80> > > > > What happens if freeUp actually got memory from the free list? We still > > seem to create a new buffer with the given size. Why are we not returning > > from the pool? It looks like availableMemory can go negative here. > > Jay Kreps wrote: > Let me know if this is just confusion about how the pool works. The idea > is that I want to optimize the common case of allocations that match the > batch size, but I still want to allow large allocations of any size. I also > want to avoid implementing malloc, as you rightly pointed out. The compromise > I came up with was to special case the batch size and pool that size since > that should be 99% of the allocations, when messages come in that are bigger > then this i just directly allocate memory from them and don't attempt to pool > it. > > But this means you can have a bunch of buffers in the free list (which > are always exactly batchSize) that use up all available memory. So if you get > a custom allocation size you need to toss out some pooled buffers to ensure > you don't exceed your memory limit. Let me know if that makes sense and isn't > totally insane.
I would be surprised if one sets the batch size and sends messages larger than the batch size. As you pointed out this would be rare. Does it make sense then to keep the max memory a soft limit and just allocate the large message and reduce complexity here. I am not recommending one way or the other though. - Sriram ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/17263/#review32800 ----------------------------------------------------------- On Jan. 23, 2014, 8:54 p.m., Jay Kreps wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/17263/ > ----------------------------------------------------------- > > (Updated Jan. 23, 2014, 8:54 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1227 > https://issues.apache.org/jira/browse/KAFKA-1227 > > > Repository: kafka > > > Description > ------- > > KAFKA-1227 New producer! > > > Diffs > ----- > > clients/build.sbt PRE-CREATION > clients/src/main/java/kafka/clients/producer/BufferExhaustedException.java > PRE-CREATION > clients/src/main/java/kafka/clients/producer/Callback.java PRE-CREATION > clients/src/main/java/kafka/clients/producer/DefaultPartitioner.java > PRE-CREATION > clients/src/main/java/kafka/clients/producer/KafkaProducer.java > PRE-CREATION > clients/src/main/java/kafka/clients/producer/MockProducer.java PRE-CREATION > clients/src/main/java/kafka/clients/producer/Partitioner.java PRE-CREATION > clients/src/main/java/kafka/clients/producer/Producer.java PRE-CREATION > clients/src/main/java/kafka/clients/producer/ProducerConfig.java > PRE-CREATION > clients/src/main/java/kafka/clients/producer/ProducerRecord.java > PRE-CREATION > clients/src/main/java/kafka/clients/producer/RecordSend.java PRE-CREATION > clients/src/main/java/kafka/clients/producer/internals/BufferPool.java > PRE-CREATION > clients/src/main/java/kafka/clients/producer/internals/Metadata.java > PRE-CREATION > > clients/src/main/java/kafka/clients/producer/internals/ProduceRequestResult.java > PRE-CREATION > > clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java > PRE-CREATION > clients/src/main/java/kafka/clients/producer/internals/RecordBatch.java > PRE-CREATION > clients/src/main/java/kafka/clients/producer/internals/Sender.java > PRE-CREATION > clients/src/main/java/kafka/clients/tools/ProducerPerformance.java > PRE-CREATION > clients/src/main/java/kafka/common/ByteSerialization.java PRE-CREATION > clients/src/main/java/kafka/common/Cluster.java PRE-CREATION > clients/src/main/java/kafka/common/Configurable.java PRE-CREATION > clients/src/main/java/kafka/common/Deserializer.java PRE-CREATION > clients/src/main/java/kafka/common/KafkaException.java PRE-CREATION > clients/src/main/java/kafka/common/Metric.java PRE-CREATION > clients/src/main/java/kafka/common/Node.java PRE-CREATION > clients/src/main/java/kafka/common/PartitionInfo.java PRE-CREATION > clients/src/main/java/kafka/common/Serializer.java PRE-CREATION > clients/src/main/java/kafka/common/StringSerialization.java PRE-CREATION > clients/src/main/java/kafka/common/TopicPartition.java PRE-CREATION > clients/src/main/java/kafka/common/config/AbstractConfig.java PRE-CREATION > clients/src/main/java/kafka/common/config/ConfigDef.java PRE-CREATION > clients/src/main/java/kafka/common/config/ConfigException.java PRE-CREATION > clients/src/main/java/kafka/common/errors/ApiException.java PRE-CREATION > clients/src/main/java/kafka/common/errors/CorruptMessageException.java > PRE-CREATION > clients/src/main/java/kafka/common/errors/LeaderNotAvailableException.java > PRE-CREATION > clients/src/main/java/kafka/common/errors/MessageTooLargeException.java > PRE-CREATION > clients/src/main/java/kafka/common/errors/NetworkException.java > PRE-CREATION > > clients/src/main/java/kafka/common/errors/NotLeaderForPartitionException.java > PRE-CREATION > clients/src/main/java/kafka/common/errors/OffsetMetadataTooLarge.java > PRE-CREATION > clients/src/main/java/kafka/common/errors/OffsetOutOfRangeException.java > PRE-CREATION > clients/src/main/java/kafka/common/errors/RetryableException.java > PRE-CREATION > clients/src/main/java/kafka/common/errors/TimeoutException.java > PRE-CREATION > clients/src/main/java/kafka/common/errors/UnknownServerException.java > PRE-CREATION > > clients/src/main/java/kafka/common/errors/UnknownTopicOrPartitionException.java > PRE-CREATION > clients/src/main/java/kafka/common/metrics/CompoundStat.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/JmxReporter.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/KafkaMetric.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/Measurable.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/MeasurableStat.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/MetricConfig.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/Metrics.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/MetricsReporter.java > PRE-CREATION > clients/src/main/java/kafka/common/metrics/Quota.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/QuotaViolationException.java > PRE-CREATION > clients/src/main/java/kafka/common/metrics/Sensor.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/Stat.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/Avg.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/Count.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/Histogram.java > PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/Max.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/Min.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/Percentile.java > PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/Percentiles.java > PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/Rate.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/SampledStat.java > PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/Total.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/WindowedStat.java > PRE-CREATION > clients/src/main/java/kafka/common/network/ByteBufferReceive.java > PRE-CREATION > clients/src/main/java/kafka/common/network/ByteBufferSend.java PRE-CREATION > clients/src/main/java/kafka/common/network/NetworkReceive.java PRE-CREATION > clients/src/main/java/kafka/common/network/NetworkSend.java PRE-CREATION > clients/src/main/java/kafka/common/network/Receive.java PRE-CREATION > clients/src/main/java/kafka/common/network/Selectable.java PRE-CREATION > clients/src/main/java/kafka/common/network/Selector.java PRE-CREATION > clients/src/main/java/kafka/common/network/Send.java PRE-CREATION > clients/src/main/java/kafka/common/protocol/ApiKeys.java PRE-CREATION > clients/src/main/java/kafka/common/protocol/Errors.java PRE-CREATION > clients/src/main/java/kafka/common/protocol/ProtoUtils.java PRE-CREATION > clients/src/main/java/kafka/common/protocol/Protocol.java PRE-CREATION > clients/src/main/java/kafka/common/protocol/types/ArrayOf.java PRE-CREATION > clients/src/main/java/kafka/common/protocol/types/Field.java PRE-CREATION > clients/src/main/java/kafka/common/protocol/types/Schema.java PRE-CREATION > clients/src/main/java/kafka/common/protocol/types/SchemaException.java > PRE-CREATION > clients/src/main/java/kafka/common/protocol/types/Struct.java PRE-CREATION > clients/src/main/java/kafka/common/protocol/types/Type.java PRE-CREATION > clients/src/main/java/kafka/common/record/CompressionType.java PRE-CREATION > clients/src/main/java/kafka/common/record/InvalidRecordException.java > PRE-CREATION > clients/src/main/java/kafka/common/record/LogEntry.java PRE-CREATION > clients/src/main/java/kafka/common/record/MemoryRecords.java PRE-CREATION > clients/src/main/java/kafka/common/record/Record.java PRE-CREATION > clients/src/main/java/kafka/common/record/Records.java PRE-CREATION > clients/src/main/java/kafka/common/requests/RequestHeader.java PRE-CREATION > clients/src/main/java/kafka/common/requests/RequestSend.java PRE-CREATION > clients/src/main/java/kafka/common/requests/ResponseHeader.java > PRE-CREATION > clients/src/main/java/kafka/common/utils/AbstractIterator.java PRE-CREATION > clients/src/main/java/kafka/common/utils/CopyOnWriteMap.java PRE-CREATION > clients/src/main/java/kafka/common/utils/Crc32.java PRE-CREATION > clients/src/main/java/kafka/common/utils/KafkaThread.java PRE-CREATION > clients/src/main/java/kafka/common/utils/SystemTime.java PRE-CREATION > clients/src/main/java/kafka/common/utils/Time.java PRE-CREATION > clients/src/main/java/kafka/common/utils/Utils.java PRE-CREATION > clients/src/test/java/kafka/clients/common/network/SelectorTest.java > PRE-CREATION > clients/src/test/java/kafka/clients/producer/BufferPoolTest.java > PRE-CREATION > clients/src/test/java/kafka/clients/producer/MetadataTest.java PRE-CREATION > clients/src/test/java/kafka/clients/producer/MockProducerTest.java > PRE-CREATION > clients/src/test/java/kafka/clients/producer/RecordAccumulatorTest.java > PRE-CREATION > clients/src/test/java/kafka/clients/producer/RecordSendTest.java > PRE-CREATION > clients/src/test/java/kafka/clients/producer/SenderTest.java PRE-CREATION > clients/src/test/java/kafka/common/config/ConfigDefTest.java PRE-CREATION > clients/src/test/java/kafka/common/metrics/JmxReporterTest.java > PRE-CREATION > clients/src/test/java/kafka/common/metrics/MetricsTest.java PRE-CREATION > clients/src/test/java/kafka/common/metrics/stats/HistogramTest.java > PRE-CREATION > > clients/src/test/java/kafka/common/protocol/types/ProtocolSerializationTest.java > PRE-CREATION > clients/src/test/java/kafka/common/record/MemoryRecordsTest.java > PRE-CREATION > clients/src/test/java/kafka/common/record/RecordTest.java PRE-CREATION > clients/src/test/java/kafka/common/utils/AbstractIteratorTest.java > PRE-CREATION > clients/src/test/java/kafka/common/utils/MockTime.java PRE-CREATION > clients/src/test/java/kafka/test/MetricsBench.java PRE-CREATION > clients/src/test/java/kafka/test/Microbenchmarks.java PRE-CREATION > clients/src/test/java/kafka/test/MockSelector.java PRE-CREATION > clients/src/test/java/kafka/test/TestUtils.java PRE-CREATION > project/Build.scala 098e874b8a14067739e6cd1f0a9ab9597ae84b6e > > Diff: https://reviews.apache.org/r/17263/diff/ > > > Testing > ------- > > > Thanks, > > Jay Kreps > >