> On Jan. 26, 2014, 11:30 p.m., Sriram Subramanian wrote:
> > clients/src/main/java/kafka/clients/producer/KafkaProducer.java, line 150
> > <https://reviews.apache.org/r/17263/diff/1/?file=436445#file436445line150>
> >
> >     I think what you meant here when you said "the callback will execute in 
> > the I/O thread of the producer and so should be reasonably fast." is that 
> > the callback should be implemented such that it returns immediately. This 
> > is not very obvious from the current description.
> 
> Jay Kreps wrote:
>     Hmm, you don't think so? I will specifically mention blocking.

ok. You may want to update the callback interface with the same description.


> On Jan. 26, 2014, 11:30 p.m., Sriram Subramanian wrote:
> > clients/src/main/java/kafka/clients/producer/RecordSend.java, line 34
> > <https://reviews.apache.org/r/17263/diff/1/?file=436451#file436451line34>
> >
> >     This seems a bit confusing. It would be useful to keep the usage of 
> > RecordSend in the callback and sync producer to be the same. Is there any 
> > reason that await throws on error when you have apis to check error?
> 
> Jay Kreps wrote:
>     I think I am confused. The usage in callback and directly is the same, no?
>     
>     The question of whether to have await() throw an error or not is good. I 
> think that is how futures in java work. It also forces the error versus 
> making it possible to use the offset when there is an error. Basically I 
> think I wanted to avoid the mongodb getLastError() where errors are silent 
> unless you explicitly look for them.

I was referring to the following usage - 

1. Error need not have to be checked here
ProducerRecord record = new ProducerRecord("the-topic", "key, "value");
RecordSend send = producer.send(myRecord, null).await();
print send.offset()
     
2. Error has to be checked
ProducerRecord record = new ProducerRecord("the-topic", "key, "value");
producer.send(myRecord, new Callback() { 
                               public void onCompletion(RecordSend send) {
                                  try {
                                    if (send.hasError())
                                       print send.getError()
                                    else
                                       print send.offset();     
                                  } catch(KafkaException e) {
                                    e.printStackTrace();        
                               }
        
                             }
                         });

I was noting this inconsistency.
        


> On Jan. 26, 2014, 11:30 p.m., Sriram Subramanian wrote:
> > clients/src/main/java/kafka/clients/producer/internals/BufferPool.java, 
> > line 79
> > <https://reviews.apache.org/r/17263/diff/1/?file=436452#file436452line79>
> >
> >     Is this exception safe?
> 
> Jay Kreps wrote:
>     Hmm, I think what you are saying is that interrupting the thread may 
> cause it to leak memory?

yes


> On Jan. 26, 2014, 11:30 p.m., Sriram Subramanian wrote:
> > clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java,
> >  line 87
> > <https://reviews.apache.org/r/17263/diff/1/?file=436455#file436455line87>
> >
> >     We can allocate a batch larger than the batchSize?
> 
> Jay Kreps wrote:
>     Yeah. This isn't terribly self-explanatory. The idea here is that you 
> want to encourage medium sized batches to avoid small sends. So maybe you set 
> a batch size of 4k. If you get a 1k message, you allocate a 4k buffer and 
> append it and hope more messages show up. If you get a large message you just 
> allocate a buffer for it and send it right away. This let's us have no 
> artificial bound on message size, but handle small messages efficiently.

Makes sense


- Sriram


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17263/#review32800
-----------------------------------------------------------


On Jan. 23, 2014, 8:54 p.m., Jay Kreps wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/17263/
> -----------------------------------------------------------
> 
> (Updated Jan. 23, 2014, 8:54 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1227
>     https://issues.apache.org/jira/browse/KAFKA-1227
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1227 New producer!
> 
> 
> Diffs
> -----
> 
>   clients/build.sbt PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/BufferExhaustedException.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/Callback.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/DefaultPartitioner.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/KafkaProducer.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/MockProducer.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/Partitioner.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/Producer.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/ProducerConfig.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/ProducerRecord.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/RecordSend.java PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/BufferPool.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/Metadata.java 
> PRE-CREATION 
>   
> clients/src/main/java/kafka/clients/producer/internals/ProduceRequestResult.java
>  PRE-CREATION 
>   
> clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/RecordBatch.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/clients/producer/internals/Sender.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/clients/tools/ProducerPerformance.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/ByteSerialization.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Cluster.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Configurable.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Deserializer.java PRE-CREATION 
>   clients/src/main/java/kafka/common/KafkaException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Metric.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Node.java PRE-CREATION 
>   clients/src/main/java/kafka/common/PartitionInfo.java PRE-CREATION 
>   clients/src/main/java/kafka/common/Serializer.java PRE-CREATION 
>   clients/src/main/java/kafka/common/StringSerialization.java PRE-CREATION 
>   clients/src/main/java/kafka/common/TopicPartition.java PRE-CREATION 
>   clients/src/main/java/kafka/common/config/AbstractConfig.java PRE-CREATION 
>   clients/src/main/java/kafka/common/config/ConfigDef.java PRE-CREATION 
>   clients/src/main/java/kafka/common/config/ConfigException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/ApiException.java PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/CorruptMessageException.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/LeaderNotAvailableException.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/MessageTooLargeException.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/NetworkException.java 
> PRE-CREATION 
>   
> clients/src/main/java/kafka/common/errors/NotLeaderForPartitionException.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/OffsetMetadataTooLarge.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/OffsetOutOfRangeException.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/RetryableException.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/TimeoutException.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/errors/UnknownServerException.java 
> PRE-CREATION 
>   
> clients/src/main/java/kafka/common/errors/UnknownTopicOrPartitionException.java
>  PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/CompoundStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/JmxReporter.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/KafkaMetric.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Measurable.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/MeasurableStat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/MetricConfig.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Metrics.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/MetricsReporter.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Quota.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/QuotaViolationException.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Sensor.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/Stat.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Avg.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Count.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Histogram.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Max.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Min.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Percentile.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Percentiles.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Rate.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/SampledStat.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/Total.java PRE-CREATION 
>   clients/src/main/java/kafka/common/metrics/stats/WindowedStat.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/network/ByteBufferReceive.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/network/ByteBufferSend.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/NetworkReceive.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/NetworkSend.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Receive.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Selectable.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Selector.java PRE-CREATION 
>   clients/src/main/java/kafka/common/network/Send.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/ApiKeys.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/Errors.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/ProtoUtils.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/Protocol.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/ArrayOf.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Field.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Schema.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/SchemaException.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Struct.java PRE-CREATION 
>   clients/src/main/java/kafka/common/protocol/types/Type.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/CompressionType.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/InvalidRecordException.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/record/LogEntry.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/MemoryRecords.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/Record.java PRE-CREATION 
>   clients/src/main/java/kafka/common/record/Records.java PRE-CREATION 
>   clients/src/main/java/kafka/common/requests/RequestHeader.java PRE-CREATION 
>   clients/src/main/java/kafka/common/requests/RequestSend.java PRE-CREATION 
>   clients/src/main/java/kafka/common/requests/ResponseHeader.java 
> PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/AbstractIterator.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/CopyOnWriteMap.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/Crc32.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/KafkaThread.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/SystemTime.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/Time.java PRE-CREATION 
>   clients/src/main/java/kafka/common/utils/Utils.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/common/network/SelectorTest.java 
> PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/BufferPoolTest.java 
> PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/MetadataTest.java PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/MockProducerTest.java 
> PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/RecordAccumulatorTest.java 
> PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/RecordSendTest.java 
> PRE-CREATION 
>   clients/src/test/java/kafka/clients/producer/SenderTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/config/ConfigDefTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/metrics/JmxReporterTest.java 
> PRE-CREATION 
>   clients/src/test/java/kafka/common/metrics/MetricsTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/metrics/stats/HistogramTest.java 
> PRE-CREATION 
>   
> clients/src/test/java/kafka/common/protocol/types/ProtocolSerializationTest.java
>  PRE-CREATION 
>   clients/src/test/java/kafka/common/record/MemoryRecordsTest.java 
> PRE-CREATION 
>   clients/src/test/java/kafka/common/record/RecordTest.java PRE-CREATION 
>   clients/src/test/java/kafka/common/utils/AbstractIteratorTest.java 
> PRE-CREATION 
>   clients/src/test/java/kafka/common/utils/MockTime.java PRE-CREATION 
>   clients/src/test/java/kafka/test/MetricsBench.java PRE-CREATION 
>   clients/src/test/java/kafka/test/Microbenchmarks.java PRE-CREATION 
>   clients/src/test/java/kafka/test/MockSelector.java PRE-CREATION 
>   clients/src/test/java/kafka/test/TestUtils.java PRE-CREATION 
>   project/Build.scala 098e874b8a14067739e6cd1f0a9ab9597ae84b6e 
> 
> Diff: https://reviews.apache.org/r/17263/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jay Kreps
> 
>

Reply via email to