> On Jan. 28, 2014, 8:48 p.m., Joel Koshy wrote: > > clients/src/main/java/kafka/clients/producer/KafkaProducer.java, line 182 > > <https://reviews.apache.org/r/17263/diff/1/?file=436445#file436445line182> > > > > This could add a couple minutes startup for producers that send to > > several > > topics (such as mirror-makers) since metadata lookup (at startup) will > > be > > issued serially for each topic. > > > > Jay Kreps wrote: > Wait, why? Shouldn't metadata fetch be very fast?
Yes - faster than I thought. i.e., I originally thought the send blocks until a leader is elected for the topic but that does not seem to be the case. It returns as soon as the topic is created. In any event, the latencies can add up if you are sending for the first time to a lot of new topics. I'm not sure this matters though. > On Jan. 28, 2014, 8:48 p.m., Joel Koshy wrote: > > clients/src/main/java/kafka/clients/producer/internals/Sender.java, line 299 > > <https://reviews.apache.org/r/17263/diff/1/?file=436457#file436457line299> > > > > On retryable errors we should probably force a metadata refresh. This is > > separate from retries (which I don't see implemented in this patch). Are > > retries going to be in-built or is the client expected to handle > > retries? > > > > > > Jay Kreps wrote: > I don't think we have decided about retries. > > This is a good point about re-fetching metadata. I don't believe I fully > understand when metadata refetch should happen. if (Errors.forCode(errorCode).exception instanceof RetryableException) this.metadata.forceUpdate(); Otherwise, subsequent sends to this partition would remain in error (unnecessarily for retryable errors) until the next metadata refresh. > On Jan. 28, 2014, 8:48 p.m., Joel Koshy wrote: > > clients/src/main/java/kafka/clients/producer/internals/Sender.java, line 448 > > <https://reviews.apache.org/r/17263/diff/1/?file=436457#file436457line448> > > > > Why do we need a deque for inflight requests? Given that there can be at > > most one outstanding request a reference should have been sufficient - > > no? > > > > Jay Kreps wrote: > There can be only one request being written at any time but there can be > any number of in-flight requests. Yes, but in processReadyPartitions you filter out (by calling canSendMore) partitions destined for brokers that already have a request in the deque. I may be missing something but I don't see the deque growing > 1 (except that you currently allow a metadata request in as well). - Joel ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/17263/#review33033 ----------------------------------------------------------- On Jan. 23, 2014, 8:54 p.m., Jay Kreps wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/17263/ > ----------------------------------------------------------- > > (Updated Jan. 23, 2014, 8:54 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1227 > https://issues.apache.org/jira/browse/KAFKA-1227 > > > Repository: kafka > > > Description > ------- > > KAFKA-1227 New producer! > > > Diffs > ----- > > clients/build.sbt PRE-CREATION > clients/src/main/java/kafka/clients/producer/BufferExhaustedException.java > PRE-CREATION > clients/src/main/java/kafka/clients/producer/Callback.java PRE-CREATION > clients/src/main/java/kafka/clients/producer/DefaultPartitioner.java > PRE-CREATION > clients/src/main/java/kafka/clients/producer/KafkaProducer.java > PRE-CREATION > clients/src/main/java/kafka/clients/producer/MockProducer.java PRE-CREATION > clients/src/main/java/kafka/clients/producer/Partitioner.java PRE-CREATION > clients/src/main/java/kafka/clients/producer/Producer.java PRE-CREATION > clients/src/main/java/kafka/clients/producer/ProducerConfig.java > PRE-CREATION > clients/src/main/java/kafka/clients/producer/ProducerRecord.java > PRE-CREATION > clients/src/main/java/kafka/clients/producer/RecordSend.java PRE-CREATION > clients/src/main/java/kafka/clients/producer/internals/BufferPool.java > PRE-CREATION > clients/src/main/java/kafka/clients/producer/internals/Metadata.java > PRE-CREATION > > clients/src/main/java/kafka/clients/producer/internals/ProduceRequestResult.java > PRE-CREATION > > clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java > PRE-CREATION > clients/src/main/java/kafka/clients/producer/internals/RecordBatch.java > PRE-CREATION > clients/src/main/java/kafka/clients/producer/internals/Sender.java > PRE-CREATION > clients/src/main/java/kafka/clients/tools/ProducerPerformance.java > PRE-CREATION > clients/src/main/java/kafka/common/ByteSerialization.java PRE-CREATION > clients/src/main/java/kafka/common/Cluster.java PRE-CREATION > clients/src/main/java/kafka/common/Configurable.java PRE-CREATION > clients/src/main/java/kafka/common/Deserializer.java PRE-CREATION > clients/src/main/java/kafka/common/KafkaException.java PRE-CREATION > clients/src/main/java/kafka/common/Metric.java PRE-CREATION > clients/src/main/java/kafka/common/Node.java PRE-CREATION > clients/src/main/java/kafka/common/PartitionInfo.java PRE-CREATION > clients/src/main/java/kafka/common/Serializer.java PRE-CREATION > clients/src/main/java/kafka/common/StringSerialization.java PRE-CREATION > clients/src/main/java/kafka/common/TopicPartition.java PRE-CREATION > clients/src/main/java/kafka/common/config/AbstractConfig.java PRE-CREATION > clients/src/main/java/kafka/common/config/ConfigDef.java PRE-CREATION > clients/src/main/java/kafka/common/config/ConfigException.java PRE-CREATION > clients/src/main/java/kafka/common/errors/ApiException.java PRE-CREATION > clients/src/main/java/kafka/common/errors/CorruptMessageException.java > PRE-CREATION > clients/src/main/java/kafka/common/errors/LeaderNotAvailableException.java > PRE-CREATION > clients/src/main/java/kafka/common/errors/MessageTooLargeException.java > PRE-CREATION > clients/src/main/java/kafka/common/errors/NetworkException.java > PRE-CREATION > > clients/src/main/java/kafka/common/errors/NotLeaderForPartitionException.java > PRE-CREATION > clients/src/main/java/kafka/common/errors/OffsetMetadataTooLarge.java > PRE-CREATION > clients/src/main/java/kafka/common/errors/OffsetOutOfRangeException.java > PRE-CREATION > clients/src/main/java/kafka/common/errors/RetryableException.java > PRE-CREATION > clients/src/main/java/kafka/common/errors/TimeoutException.java > PRE-CREATION > clients/src/main/java/kafka/common/errors/UnknownServerException.java > PRE-CREATION > > clients/src/main/java/kafka/common/errors/UnknownTopicOrPartitionException.java > PRE-CREATION > clients/src/main/java/kafka/common/metrics/CompoundStat.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/JmxReporter.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/KafkaMetric.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/Measurable.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/MeasurableStat.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/MetricConfig.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/Metrics.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/MetricsReporter.java > PRE-CREATION > clients/src/main/java/kafka/common/metrics/Quota.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/QuotaViolationException.java > PRE-CREATION > clients/src/main/java/kafka/common/metrics/Sensor.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/Stat.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/Avg.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/Count.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/Histogram.java > PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/Max.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/Min.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/Percentile.java > PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/Percentiles.java > PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/Rate.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/SampledStat.java > PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/Total.java PRE-CREATION > clients/src/main/java/kafka/common/metrics/stats/WindowedStat.java > PRE-CREATION > clients/src/main/java/kafka/common/network/ByteBufferReceive.java > PRE-CREATION > clients/src/main/java/kafka/common/network/ByteBufferSend.java PRE-CREATION > clients/src/main/java/kafka/common/network/NetworkReceive.java PRE-CREATION > clients/src/main/java/kafka/common/network/NetworkSend.java PRE-CREATION > clients/src/main/java/kafka/common/network/Receive.java PRE-CREATION > clients/src/main/java/kafka/common/network/Selectable.java PRE-CREATION > clients/src/main/java/kafka/common/network/Selector.java PRE-CREATION > clients/src/main/java/kafka/common/network/Send.java PRE-CREATION > clients/src/main/java/kafka/common/protocol/ApiKeys.java PRE-CREATION > clients/src/main/java/kafka/common/protocol/Errors.java PRE-CREATION > clients/src/main/java/kafka/common/protocol/ProtoUtils.java PRE-CREATION > clients/src/main/java/kafka/common/protocol/Protocol.java PRE-CREATION > clients/src/main/java/kafka/common/protocol/types/ArrayOf.java PRE-CREATION > clients/src/main/java/kafka/common/protocol/types/Field.java PRE-CREATION > clients/src/main/java/kafka/common/protocol/types/Schema.java PRE-CREATION > clients/src/main/java/kafka/common/protocol/types/SchemaException.java > PRE-CREATION > clients/src/main/java/kafka/common/protocol/types/Struct.java PRE-CREATION > clients/src/main/java/kafka/common/protocol/types/Type.java PRE-CREATION > clients/src/main/java/kafka/common/record/CompressionType.java PRE-CREATION > clients/src/main/java/kafka/common/record/InvalidRecordException.java > PRE-CREATION > clients/src/main/java/kafka/common/record/LogEntry.java PRE-CREATION > clients/src/main/java/kafka/common/record/MemoryRecords.java PRE-CREATION > clients/src/main/java/kafka/common/record/Record.java PRE-CREATION > clients/src/main/java/kafka/common/record/Records.java PRE-CREATION > clients/src/main/java/kafka/common/requests/RequestHeader.java PRE-CREATION > clients/src/main/java/kafka/common/requests/RequestSend.java PRE-CREATION > clients/src/main/java/kafka/common/requests/ResponseHeader.java > PRE-CREATION > clients/src/main/java/kafka/common/utils/AbstractIterator.java PRE-CREATION > clients/src/main/java/kafka/common/utils/CopyOnWriteMap.java PRE-CREATION > clients/src/main/java/kafka/common/utils/Crc32.java PRE-CREATION > clients/src/main/java/kafka/common/utils/KafkaThread.java PRE-CREATION > clients/src/main/java/kafka/common/utils/SystemTime.java PRE-CREATION > clients/src/main/java/kafka/common/utils/Time.java PRE-CREATION > clients/src/main/java/kafka/common/utils/Utils.java PRE-CREATION > clients/src/test/java/kafka/clients/common/network/SelectorTest.java > PRE-CREATION > clients/src/test/java/kafka/clients/producer/BufferPoolTest.java > PRE-CREATION > clients/src/test/java/kafka/clients/producer/MetadataTest.java PRE-CREATION > clients/src/test/java/kafka/clients/producer/MockProducerTest.java > PRE-CREATION > clients/src/test/java/kafka/clients/producer/RecordAccumulatorTest.java > PRE-CREATION > clients/src/test/java/kafka/clients/producer/RecordSendTest.java > PRE-CREATION > clients/src/test/java/kafka/clients/producer/SenderTest.java PRE-CREATION > clients/src/test/java/kafka/common/config/ConfigDefTest.java PRE-CREATION > clients/src/test/java/kafka/common/metrics/JmxReporterTest.java > PRE-CREATION > clients/src/test/java/kafka/common/metrics/MetricsTest.java PRE-CREATION > clients/src/test/java/kafka/common/metrics/stats/HistogramTest.java > PRE-CREATION > > clients/src/test/java/kafka/common/protocol/types/ProtocolSerializationTest.java > PRE-CREATION > clients/src/test/java/kafka/common/record/MemoryRecordsTest.java > PRE-CREATION > clients/src/test/java/kafka/common/record/RecordTest.java PRE-CREATION > clients/src/test/java/kafka/common/utils/AbstractIteratorTest.java > PRE-CREATION > clients/src/test/java/kafka/common/utils/MockTime.java PRE-CREATION > clients/src/test/java/kafka/test/MetricsBench.java PRE-CREATION > clients/src/test/java/kafka/test/Microbenchmarks.java PRE-CREATION > clients/src/test/java/kafka/test/MockSelector.java PRE-CREATION > clients/src/test/java/kafka/test/TestUtils.java PRE-CREATION > project/Build.scala 098e874b8a14067739e6cd1f0a9ab9597ae84b6e > > Diff: https://reviews.apache.org/r/17263/diff/ > > > Testing > ------- > > > Thanks, > > Jay Kreps > >