> On Jan. 26, 2014, 11:30 p.m., Sriram Subramanian wrote: > > clients/src/main/java/kafka/clients/producer/KafkaProducer.java, line 150 > > <https://reviews.apache.org/r/17263/diff/1/?file=436445#file436445line150> > > > > I think what you meant here when you said "the callback will execute in > > the I/O thread of the producer and so should be reasonably fast." is that > > the callback should be implemented such that it returns immediately. This > > is not very obvious from the current description. > > Jay Kreps wrote: > Hmm, you don't think so? I will specifically mention blocking.
ok. You may want to update the callback interface with the same description. > On Jan. 26, 2014, 11:30 p.m., Sriram Subramanian wrote: > > clients/src/main/java/kafka/clients/producer/RecordSend.java, line 34 > > <https://reviews.apache.org/r/17263/diff/1/?file=436451#file436451line34> > > > > This seems a bit confusing. It would be useful to keep the usage of > > RecordSend in the callback and sync producer to be the same. Is there any > > reason that await throws on error when you have apis to check error? > > Jay Kreps wrote: > I think I am confused. The usage in callback and directly is the same, no? > > The question of whether to have await() throw an error or not is good. I > think that is how futures in java work. It also forces the error versus > making it possible to use the offset when there is an error. Basically I > think I wanted to avoid the mongodb getLastError() where errors are silent > unless you explicitly look for them. I was referring to the following usage - 1. Error need not have to be checked here ProducerRecord record = new ProducerRecord("the-topic", "key, "value"); RecordSend send = producer.send(myRecord, null).await(); print send.offset() 2. Error has to be checked ProducerRecord record = new ProducerRecord("the-topic", "key, "value"); producer.send(myRecord, new Callback() { public void onCompletion(RecordSend send) { try { if (send.hasError()) print send.getError() else print send.offset(); } catch(KafkaException e) { e.printStackTrace(); } } }); I was noting this inconsistency. > On Jan. 26, 2014, 11:30 p.m., Sriram Subramanian wrote: > > clients/src/main/java/kafka/clients/producer/internals/BufferPool.java, > > line 79 > > <https://reviews.apache.org/r/17263/diff/1/?file=436452#file436452line79> > > > > Is this exception safe? > > Jay Kreps wrote: > Hmm, I think what you are saying is that interrupting the thread may > cause it to leak memory? yes > On Jan. 26, 2014, 11:30 p.m., Sriram Subramanian wrote: > > clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java, > > line 87 > > <https://reviews.apache.org/r/17263/diff/1/?file=436455#file436455line87> > > > > We can allocate a batch larger than the batchSize? > > Jay Kreps wrote: > Yeah. This isn't terribly self-explanatory. The idea here is that you > want to encourage medium sized batches to avoid small sends. So maybe you set > a batch size of 4k. If you get a 1k message, you allocate a 4k buffer and > append it and hope more messages show up. If you get a large message you just > allocate a buffer for it and send it right away. This let's us have no > artificial bound on message size, but handle small messages efficiently. Makes sense - Sriram ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/17263/#review32800 ----------------------------------------------------------- On Jan. 23, 2014, 8:54 p.m., Jay Kreps wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/17263/ > ----------------------------------------------------------- > > (Updated Jan. 23, 2014, 8:54 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1227 > https://issues.apache.org/jira/browse/KAFKA-1227 > > > Repository: kafka > > > Description > ------- > > KAFKA-1227 New producer! > > > Diffs > ----- > > clients/build.sbt PRE-CREATION > clients/src/main/java/kafka/clients/producer/BufferExhaustedException.java > PRE-CREATION > clients/src/main/java/kafka/clients/producer/Callback.java PRE-CREATION > clients/src/main/java/kafka/clients/producer/DefaultPartitioner.java > PRE-CREATION > clients/src/main/java/kafka/clients/producer/KafkaProducer.java > PRE-CREATION > clients/src/main/java/kafka/clients/producer/MockProducer.java PRE-CREATION > clients/src/main/java/kafka/clients/producer/Partitioner.java PRE-CREATION > clients/src/main/java/kafka/clients/producer/Producer.java PRE-CREATION > clients/src/main/java/kafka/clients/producer/ProducerConfig.java > PRE-CREATION > clients/src/main/java/kafka/clients/producer/ProducerRecord.java > PRE-CREATION > clients/src/main/java/kafka/clients/producer/RecordSend.java PRE-CREATION > clients/src/main/java/kafka/clients/producer/internals/BufferPool.java > PRE-CREATION > clients/src/main/java/kafka/clients/producer/internals/Metadata.java > PRE-CREATION > > clients/src/main/java/kafka/clients/producer/internals/ProduceRequestResult.java > PRE-CREATION > > clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java > PRE-CREATION > clients/src/main/java/kafka/clients/producer/internals/RecordBatch.java > PRE-CREATION > clients/src/main/java/kafka/clients/producer/internals/Sender.java > PRE-CREATION > clients/src/main/java/kafka/clients/tools/ProducerPerformance.java > PRE-CREATION > clients/src/main/java/kafka/common/ByteSerialization.java PRE-CREATION > clients/src/main/java/kafka/common/Cluster.java PRE-CREATION > clients/src/main/java/kafka/common/Configurable.java PRE-CREATION > clients/src/main/java/kafka/common/Deserializer.java PRE-CREATION > clients/src/main/java/kafka/common/KafkaException.java PRE-CREATION > clients/src/main/java/kafka/common/Metric.java PRE-CREATION > clients/src/main/java/kafka/common/Node.java PRE-CREATION > clients/src/main/java/kafka/common/PartitionInfo.java PRE-CREATION > clients/src/main/java/kafka/common/Serializer.java PRE-CREATION > clients/src/main/java/kafka/common/StringSerialization.java PRE-CREATION > clients/src/main/java/kafka/common/TopicPartition.java PRE-CREATION > clients/src/main/java/kafka/common/config/AbstractConfig.java PRE-CREATION > clients/src/main/java/kafka/common/config/ConfigDef.java PRE-CREATION > clients/src/main/java/kafka/common/config/ConfigException.java PRE-CREATION > clients/src/main/java/kafka/common/errors/ApiException.java PRE-CREATION > clients/src/main/java/kafka/common/errors/CorruptMessageException.java > PRE-CREATION > clients/src/main/java/kafka/common/errors/LeaderNotAvailableException.java > PRE-CREATION > clients/src/main/java/kafka/common/errors/MessageTooLargeException.java > PRE-CREATION > clients/src/main/java/kafka/common/errors/NetworkException.java > PRE-CREATION > > clients/src/main/java/kafka/common/errors/NotLeaderForPartitionException.java > PRE-CREATION > clients/src/main/java/kafka/common/errors/OffsetMetadataTooLarge.java > PRE-CREATION > clients/src/main/java/kafka/common/errors/OffsetOutOfRangeException.java > PRE-CREATION > clients/src/main/java/kafka/common/errors/RetryableException.java > PRE-CREATION > clients/src/main/java/kafka/common/errors/TimeoutException.java > PRE-CREATION > clients/src/main/java/kafka/common/errors/UnknownServerException.java > PRE-CREATION > > clients/src/main/java/kafka/common/errors/UnknownTopicOrPartitionException.java > PRE-CREATION > clients/src/main/java/kafka/common/metrics/CompoundStat.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/JmxReporter.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/KafkaMetric.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/Measurable.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/MeasurableStat.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/MetricConfig.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/Metrics.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/MetricsReporter.java > PRE-CREATION > clients/src/main/java/kafka/common/metrics/Quota.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/QuotaViolationException.java > PRE-CREATION > clients/src/main/java/kafka/common/metrics/Sensor.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/Stat.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/Avg.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/Count.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/Histogram.java > PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/Max.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/Min.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/Percentile.java > PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/Percentiles.java > PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/Rate.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/SampledStat.java > PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/Total.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/WindowedStat.java > PRE-CREATION > clients/src/main/java/kafka/common/network/ByteBufferReceive.java > PRE-CREATION > clients/src/main/java/kafka/common/network/ByteBufferSend.java PRE-CREATION > clients/src/main/java/kafka/common/network/NetworkReceive.java PRE-CREATION > clients/src/main/java/kafka/common/network/NetworkSend.java PRE-CREATION > clients/src/main/java/kafka/common/network/Receive.java PRE-CREATION > clients/src/main/java/kafka/common/network/Selectable.java PRE-CREATION > clients/src/main/java/kafka/common/network/Selector.java PRE-CREATION > clients/src/main/java/kafka/common/network/Send.java PRE-CREATION > clients/src/main/java/kafka/common/protocol/ApiKeys.java PRE-CREATION > clients/src/main/java/kafka/common/protocol/Errors.java PRE-CREATION > clients/src/main/java/kafka/common/protocol/ProtoUtils.java PRE-CREATION > clients/src/main/java/kafka/common/protocol/Protocol.java PRE-CREATION > clients/src/main/java/kafka/common/protocol/types/ArrayOf.java PRE-CREATION > clients/src/main/java/kafka/common/protocol/types/Field.java PRE-CREATION > clients/src/main/java/kafka/common/protocol/types/Schema.java PRE-CREATION > clients/src/main/java/kafka/common/protocol/types/SchemaException.java > PRE-CREATION > clients/src/main/java/kafka/common/protocol/types/Struct.java PRE-CREATION > clients/src/main/java/kafka/common/protocol/types/Type.java PRE-CREATION > clients/src/main/java/kafka/common/record/CompressionType.java PRE-CREATION > clients/src/main/java/kafka/common/record/InvalidRecordException.java > PRE-CREATION > clients/src/main/java/kafka/common/record/LogEntry.java PRE-CREATION > clients/src/main/java/kafka/common/record/MemoryRecords.java PRE-CREATION > clients/src/main/java/kafka/common/record/Record.java PRE-CREATION > clients/src/main/java/kafka/common/record/Records.java PRE-CREATION > clients/src/main/java/kafka/common/requests/RequestHeader.java PRE-CREATION > clients/src/main/java/kafka/common/requests/RequestSend.java PRE-CREATION > clients/src/main/java/kafka/common/requests/ResponseHeader.java > PRE-CREATION > clients/src/main/java/kafka/common/utils/AbstractIterator.java PRE-CREATION > clients/src/main/java/kafka/common/utils/CopyOnWriteMap.java PRE-CREATION > clients/src/main/java/kafka/common/utils/Crc32.java PRE-CREATION > clients/src/main/java/kafka/common/utils/KafkaThread.java PRE-CREATION > clients/src/main/java/kafka/common/utils/SystemTime.java PRE-CREATION > clients/src/main/java/kafka/common/utils/Time.java PRE-CREATION > clients/src/main/java/kafka/common/utils/Utils.java PRE-CREATION > clients/src/test/java/kafka/clients/common/network/SelectorTest.java > PRE-CREATION > clients/src/test/java/kafka/clients/producer/BufferPoolTest.java > PRE-CREATION > clients/src/test/java/kafka/clients/producer/MetadataTest.java PRE-CREATION > clients/src/test/java/kafka/clients/producer/MockProducerTest.java > PRE-CREATION > clients/src/test/java/kafka/clients/producer/RecordAccumulatorTest.java > PRE-CREATION > clients/src/test/java/kafka/clients/producer/RecordSendTest.java > PRE-CREATION > clients/src/test/java/kafka/clients/producer/SenderTest.java PRE-CREATION > clients/src/test/java/kafka/common/config/ConfigDefTest.java PRE-CREATION > clients/src/test/java/kafka/common/metrics/JmxReporterTest.java > PRE-CREATION > clients/src/test/java/kafka/common/metrics/MetricsTest.java PRE-CREATION > clients/src/test/java/kafka/common/metrics/stats/HistogramTest.java > PRE-CREATION > > clients/src/test/java/kafka/common/protocol/types/ProtocolSerializationTest.java > PRE-CREATION > clients/src/test/java/kafka/common/record/MemoryRecordsTest.java > PRE-CREATION > clients/src/test/java/kafka/common/record/RecordTest.java PRE-CREATION > clients/src/test/java/kafka/common/utils/AbstractIteratorTest.java > PRE-CREATION > clients/src/test/java/kafka/common/utils/MockTime.java PRE-CREATION > clients/src/test/java/kafka/test/MetricsBench.java PRE-CREATION > clients/src/test/java/kafka/test/Microbenchmarks.java PRE-CREATION > clients/src/test/java/kafka/test/MockSelector.java PRE-CREATION > clients/src/test/java/kafka/test/TestUtils.java PRE-CREATION > project/Build.scala 098e874b8a14067739e6cd1f0a9ab9597ae84b6e > > Diff: https://reviews.apache.org/r/17263/diff/ > > > Testing > ------- > > > Thanks, > > Jay Kreps > >