Gary, So you observed records-per-request-avg = 1.0 still with linger.ms = 100 or 1000?
It seems you are not use Kafka's ProducerPerformance but are using your own ExploreProducer implementation, could you elaborate a bit about how messages are piped to this client? Guozhang On Sat, Dec 12, 2015 at 11:34 AM, Gary Gershon <g...@intermediasciences.com> wrote: > I’m moving this issue from Stack Overflow to the Apache Kafka Users List > > > http://stackoverflow.com/questions/34213272/kafka-producer-0-9-performance-issue-with-small-messages > < > http://stackoverflow.com/questions/34213272/kafka-producer-0-9-performance-issue-with-small-messages > > > > There was some discussion on Stack Overflow, but I’m expecting the Apache > List will be better able to help in identifying the root cause since it > appears not necessarily to be a BlueMix Message Hub issue. Thanks! > > ----- > > We are observing very poor performance with a Java Kafka Producer 0.9 > client when sending small messages. The messages are not being accumulated > into a larger request batch and thus each small record is being sent > separately. > > What is wrong with our client configuration? Or is this some other issue? > > ----- > > Using Kafka Client 0.9.0.0. We did not see any related postings in the > Kafka unreleased 9.0.1 or 9.1 fixed or unresolved lists, so we are focused > on our client configuration and server instance. > > We understand the linger.ms should cause the client to accumulate records > into a batch. > > We set linger.ms to 10 (and also tried 100 and 1000) but these did not > result in the batch accumulating records. With a record size of about 100 > bytes and a request buffer size of 16K, We would have expected about 160 > messages to be sent in a single request. > > The trace at the client seems to indicate that the partition may be full, > despite having allocated a fresh Bluemix Messaging Hub (Kafka Server 0.9) > service instance. The test client is sending multiple messages in a loop > with no other I/O. > > ----- > > The log shows a repeating sequence with a suspect line: "**Waking up the > sender since topic mytopic partition 0 is either full or getting a new > batch**". > > So the newly allocated partition should be essentially empty in our test > case, thus why would the producer client be getting a new batch? > > <pre> > 2015-12-10 15:14:41,335 3677 [main] TRACE > com.isllc.client.producer.ExploreProducer - Sending record: > Topic='mytopic', Key='records', Value='Kafka 0.9 Java Client Record Test > Message 00011 2015-12-10T15:14:41.335-05:00' > 2015-12-10 15:14:41,336 3678 [main] TRACE > org.apache.kafka.clients.producer.KafkaProducer - Sending record > ProducerRecord(topic=mytopic, partition=null, key=[B@670b40af, > value=[B@4923ab24 with callback null to topic mytopic partition 0 > 2015-12-10 15:14:41,336 3678 [main] TRACE > org.apache.kafka.clients.producer.internals.RecordAccumulator - Allocating > a new 16384 byte message buffer for topic mytopic partition 0 > 2015-12-10 15:14:41,336 3678 [main] TRACE > org.apache.kafka.clients.producer.KafkaProducer - Waking up the sender > since topic mytopic partition 0 is either full or getting a new batch > 2015-12-10 15:14:41,348 3690 [kafka-producer-network-thread | > ExploreProducer] TRACE org.apache.kafka.clients.producer.internals.Sender > - Nodes with data ready to send: [Node(0, > kafka01-prod01.messagehub.services.us-south.bluemix.net, 9094)] > 2015-12-10 15:14:41,348 3690 [kafka-producer-network-thread | > ExploreProducer] TRACE org.apache.kafka.clients.producer.internals.Sender > - Created 1 produce requests: [ClientRequest(expectResponse=true, > callback=org.apache.kafka.clients.producer.internals.Sender$1@6d62e963, > request=RequestSend(header={api_key=0,api_version=1,correlation_id=11,client_id=ExploreProducer}, > body={acks=-1,timeout=30000,topic_data=[{topic=mytopic,data=[{partition=0,record_set=java.nio.HeapByteBuffer[pos=0 > lim=110 cap=16384]}]}]}), createdTimeMs=1449778481348, sendTimeMs=0)] > 2015-12-10 15:14:41,412 3754 [kafka-producer-network-thread | > ExploreProducer] TRACE org.apache.kafka.clients.producer.internals.Sender > - Received produce response from node 0 with correlation id 11 > 2015-12-10 15:14:41,412 3754 [kafka-producer-network-thread | > ExploreProducer] TRACE > org.apache.kafka.clients.producer.internals.RecordBatch - Produced > messages to topic-partition mytopic-0 with base offset offset 130 and > error: null. > 2015-12-10 15:14:41,412 3754 [main] TRACE > com.isllc.client.producer.ExploreProducer - Send returned metadata: > Topic='mytopic', Partition=0, Offset=130 > 2015-12-10 15:14:41,412 3754 [main] TRACE > com.isllc.client.producer.ExploreProducer - Sending record: > Topic='mytopic', Key='records', Value='Kafka 0.9 Java Client Record Test > Message 00012 2015-12-10T15:14:41.412-05:00' > > Log entries repeat like the above for each record sent > </pre> > > ------ > > We provided the following properties file: > > <pre> > 2015-12-10 15:14:37,843 185 [main] INFO com.isllc.client.AbstractClient > - Properties retrieved from file for Kafka client: kafka-producer.properties > 2015-12-10 15:14:37,909 251 [main] INFO com.isllc.client.AbstractClient > - acks=-1 > 2015-12-10 15:14:37,909 251 [main] INFO com.isllc.client.AbstractClient > - ssl.protocol=TLSv1.2 > 2015-12-10 15:14:37,909 251 [main] INFO com.isllc.client.AbstractClient > - > key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer > 2015-12-10 15:14:37,910 252 [main] INFO com.isllc.client.AbstractClient > - client.id=ExploreProducer > 2015-12-10 15:14:37,910 252 [main] INFO com.isllc.client.AbstractClient > - ssl.truststore.identification.algorithm=HTTPS > 2015-12-10 15:14:37,910 252 [main] INFO com.isllc.client.AbstractClient > - > value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer > 2015-12-10 15:14:37,910 252 [main] INFO com.isllc.client.AbstractClient > - ssl.truststore.password=changeit > 2015-12-10 15:14:37,910 252 [main] INFO com.isllc.client.AbstractClient > - ssl.truststore.type=JKS > 2015-12-10 15:14:37,910 252 [main] INFO com.isllc.client.AbstractClient > - ssl.enabled.protocols=TLSv1.2 > 2015-12-10 15:14:37,910 252 [main] INFO com.isllc.client.AbstractClient > - > > ssl.truststore.location=/Library/Java/JavaVirtualMachines/jdk1.8.0_51.jdk/Contents/Home/jre/lib/security/cacerts > 2015-12-10 15:14:37,910 252 [main] INFO com.isllc.client.AbstractClient > - bootstrap.servers= > kafka01-prod01.messagehub.services.us-south.bluemix.net:9094, > kafka02-prod01.messagehub.services.us-south.bluemix.net:9094, > kafka03-prod01.messagehub.services.us-south.bluemix.net:9094, > kafka04-prod01.messagehub.services.us-south.bluemix.net:9094, > kafka05-prod01.messagehub.services.us-south.bluemix.net:9094 > 2015-12-10 15:14:37,910 252 [main] INFO com.isllc.client.AbstractClient > - security.protocol=SASL_SSL > > Plus we added linger.ms=10 in code. > </pre> > > ----- > > The Kafka Client shows the expanded/merged configuration list (and > displaying the linger.ms setting): > > <pre> > 2015-12-10 15:14:37,970 312 [main] INFO > org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values: > compression.type = none > metric.reporters = [] > metadata.max.age.ms = 300000 > metadata.fetch.timeout.ms = 60000 > reconnect.backoff.ms = 50 > sasl.kerberos.ticket.renew.window.factor = 0.8 > bootstrap.servers = [ > kafka01-prod01.messagehub.services.us-south.bluemix.net:9094, > kafka02-prod01.messagehub.services.us-south.bluemix.net:9094, > kafka03-prod01.messagehub.services.us-south.bluemix.net:9094, > kafka04-prod01.messagehub.services.us-south.bluemix.net:9094, > kafka05-prod01.messagehub.services.us-south.bluemix.net:9094] > retry.backoff.ms = 100 > sasl.kerberos.kinit.cmd = /usr/bin/kinit > buffer.memory = 33554432 > timeout.ms = 30000 > key.serializer = class > org.apache.kafka.common.serialization.ByteArraySerializer > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > ssl.keystore.type = JKS > ssl.trustmanager.algorithm = PKIX > block.on.buffer.full = false > ssl.key.password = null > max.block.ms = 60000 > sasl.kerberos.min.time.before.relogin = 60000 > connections.max.idle.ms = 540000 > ssl.truststore.password = [hidden] > max.in.flight.requests.per.connection = 5 > metrics.num.samples = 2 > client.id = ExploreProducer > ssl.endpoint.identification.algorithm = null > ssl.protocol = TLSv1.2 > request.timeout.ms = 30000 > ssl.provider = null > ssl.enabled.protocols = [TLSv1.2] > acks = -1 > batch.size = 16384 > ssl.keystore.location = null > receive.buffer.bytes = 32768 > ssl.cipher.suites = null > ssl.truststore.type = JKS > security.protocol = SASL_SSL > retries = 0 > max.request.size = 1048576 > value.serializer = class > org.apache.kafka.common.serialization.ByteArraySerializer > ssl.truststore.location = > /Library/Java/JavaVirtualMachines/jdk1.8.0_51.jdk/Contents/Home/jre/lib/security/cacerts > ssl.keystore.password = null > ssl.keymanager.algorithm = SunX509 > metrics.sample.window.ms = 30000 > partitioner.class = class > org.apache.kafka.clients.producer.internals.DefaultPartitioner > send.buffer.bytes = 131072 > linger.ms = 10 > </pre> > ----- > > The Kafka metrics after sending 100 records: > > <pre> > Duration for 100 sends 8787 ms. Sent 7687 bytes. > batch-size-avg = 109.87 [The average number of bytes sent per > partition per-request.] > batch-size-max = 110.0 [The max number of bytes sent per partition > per-request.] > buffer-available-bytes = 3.3554432E7 [The total amount of buffer > memory that is not being used (either unallocated or in the free list).] > buffer-exhausted-rate = 0.0 [The average per-second number of record > sends that are dropped due to buffer exhaustion] > buffer-total-bytes = 3.3554432E7 [The maximum amount of buffer memory > the client can use (whether or not it is currently used).] > bufferpool-wait-ratio = 0.0 [The fraction of time an appender waits > for space allocation.] > byte-rate = 291.8348916277093 [] > compression-rate = 0.0 [] > compression-rate-avg = 0.0 [The average compression rate of record > batches.] > connection-close-rate = 0.0 [Connections closed per second in the > window.] > connection-count = 2.0 [The current number of active connections.] > connection-creation-rate = 0.05180541884681138 [New connections > established per second in the window.] > incoming-byte-rate = 10.342564641029007 [] > io-ratio = 0.0038877559207471236 [The fraction of time the I/O thread > spent doing I/O] > io-time-ns-avg = 353749.2840375587 [The average length of time for I/O > per select call in nanoseconds.] > io-wait-ratio = 0.21531227995769162 [The fraction of time the I/O > thread spent waiting.] > io-wait-time-ns-avg = 1.9591901192488264E7 [The average length of time > the I/O thread spent waiting for a socket ready for reads or writes in > nanoseconds.] > metadata-age = 8.096 [The age in seconds of the current producer > metadata being used.] > network-io-rate = 5.2937784999213795 [The average number of network > operations (reads or writes) on all connections per second.] > outgoing-byte-rate = 451.2298783403283 [] > produce-throttle-time-avg = 0.0 [The average throttle time in ms] > produce-throttle-time-max = 0.0 [The maximum throttle time in ms] > record-error-rate = 0.0 [The average per-second number of record sends > that resulted in errors] > record-queue-time-avg = 15.5 [The average time in ms record batches > spent in the record accumulator.] > record-queue-time-max = 434.0 [The maximum time in ms record batches > spent in the record accumulator.] > record-retry-rate = 0.0 [] > record-send-rate = 2.65611304417116 [The average number of records > sent per second.] > record-size-avg = 97.87 [The average record size] > record-size-max = 98.0 [The maximum record size] > records-per-request-avg = 1.0 [The average number of records per > request.] > request-latency-avg = 0.0 [The average request latency in ms] > request-latency-max = 74.0 [] > request-rate = 2.6468892499606897 [The average number of requests sent > per second.] > request-size-avg = 42.0 [The average size of all requests in the > window..] > request-size-max = 170.0 [The maximum size of any request sent in the > window.] > requests-in-flight = 0.0 [The current number of in-flight requests > awaiting a response.] > response-rate = 2.651196976060479 [The average number of responses > received per second.] > select-rate = 10.989861465830819 [Number of times the I/O layer > checked for new I/O to perform per second] > waiting-threads = 0.0 [The number of user threads blocked waiting for > buffer memory to enqueue their records] > </pre> > > Thanks > > > Gary Gershon > Principal, Intermedia Sciences LLC > (908) 969-1119 > g...@intermediasciences.com > > > > > -- -- Guozhang