Gary,

So you observed records-per-request-avg = 1.0 still with linger.ms = 100 or
1000?

It seems you are not use Kafka's ProducerPerformance but are using
your own ExploreProducer
implementation, could you elaborate a bit about how messages are piped to
this client?

Guozhang


On Sat, Dec 12, 2015 at 11:34 AM, Gary Gershon <g...@intermediasciences.com>
wrote:

> I’m moving this issue from Stack Overflow to the Apache Kafka Users List
>
>
> http://stackoverflow.com/questions/34213272/kafka-producer-0-9-performance-issue-with-small-messages
> <
> http://stackoverflow.com/questions/34213272/kafka-producer-0-9-performance-issue-with-small-messages
> >
>
> There was some discussion on Stack Overflow, but I’m expecting the Apache
> List will be better able to help in identifying the root cause since it
> appears not necessarily to be a BlueMix Message Hub issue.  Thanks!
>
> -----
>
> We are observing very poor performance with a Java Kafka Producer 0.9
> client when sending small messages.  The messages are not being accumulated
> into a larger request batch and thus each small record is being sent
> separately.
>
> What is wrong with our client configuration?  Or is this some other issue?
>
> -----
>
> Using Kafka Client 0.9.0.0.  We did not see any related postings in the
> Kafka  unreleased 9.0.1 or 9.1 fixed or unresolved lists, so we are focused
> on our client configuration and server instance.
>
> We understand the linger.ms should cause the client to accumulate records
> into a batch.
>
> We set linger.ms to 10 (and also tried 100 and 1000) but these did not
> result in the batch accumulating records.  With a record size of about 100
> bytes and a request buffer size of 16K, We would have expected about 160
> messages to be sent in a single request.
>
> The trace at the client seems to indicate that the partition may be full,
> despite having allocated a fresh Bluemix Messaging Hub (Kafka Server 0.9)
> service instance.  The test client is sending multiple messages in a loop
> with no other I/O.
>
> -----
>
> The log shows a repeating sequence with a suspect line: "**Waking up the
> sender since topic mytopic partition 0 is either full or getting a new
> batch**".
>
> So the newly allocated partition should be essentially empty in our test
> case, thus why would the producer client be getting a new batch?
>
> <pre>
> 2015-12-10 15:14:41,335 3677 [main] TRACE
> com.isllc.client.producer.ExploreProducer  - Sending record:
> Topic='mytopic', Key='records', Value='Kafka 0.9 Java Client Record Test
> Message 00011 2015-12-10T15:14:41.335-05:00'
> 2015-12-10 15:14:41,336 3678 [main] TRACE
> org.apache.kafka.clients.producer.KafkaProducer  - Sending record
> ProducerRecord(topic=mytopic, partition=null, key=[B@670b40af,
> value=[B@4923ab24 with callback null to topic mytopic partition 0
> 2015-12-10 15:14:41,336 3678 [main] TRACE
> org.apache.kafka.clients.producer.internals.RecordAccumulator  - Allocating
> a new 16384 byte message buffer for topic mytopic partition 0
> 2015-12-10 15:14:41,336 3678 [main] TRACE
> org.apache.kafka.clients.producer.KafkaProducer  - Waking up the sender
> since topic mytopic partition 0 is either full or getting a new batch
> 2015-12-10 15:14:41,348 3690 [kafka-producer-network-thread |
> ExploreProducer] TRACE org.apache.kafka.clients.producer.internals.Sender
> - Nodes with data ready to send: [Node(0,
> kafka01-prod01.messagehub.services.us-south.bluemix.net, 9094)]
> 2015-12-10 15:14:41,348 3690 [kafka-producer-network-thread |
> ExploreProducer] TRACE org.apache.kafka.clients.producer.internals.Sender
> - Created 1 produce requests: [ClientRequest(expectResponse=true,
> callback=org.apache.kafka.clients.producer.internals.Sender$1@6d62e963,
> request=RequestSend(header={api_key=0,api_version=1,correlation_id=11,client_id=ExploreProducer},
> body={acks=-1,timeout=30000,topic_data=[{topic=mytopic,data=[{partition=0,record_set=java.nio.HeapByteBuffer[pos=0
> lim=110 cap=16384]}]}]}), createdTimeMs=1449778481348, sendTimeMs=0)]
> 2015-12-10 15:14:41,412 3754 [kafka-producer-network-thread |
> ExploreProducer] TRACE org.apache.kafka.clients.producer.internals.Sender
> - Received produce response from node 0 with correlation id 11
> 2015-12-10 15:14:41,412 3754 [kafka-producer-network-thread |
> ExploreProducer] TRACE
> org.apache.kafka.clients.producer.internals.RecordBatch  - Produced
> messages to topic-partition mytopic-0 with base offset offset 130 and
> error: null.
> 2015-12-10 15:14:41,412 3754 [main] TRACE
> com.isllc.client.producer.ExploreProducer  - Send returned metadata:
> Topic='mytopic', Partition=0, Offset=130
> 2015-12-10 15:14:41,412 3754 [main] TRACE
> com.isllc.client.producer.ExploreProducer  - Sending record:
> Topic='mytopic', Key='records', Value='Kafka 0.9 Java Client Record Test
> Message 00012 2015-12-10T15:14:41.412-05:00'
>
> Log entries repeat like the above for each record sent
> </pre>
>
> ------
>
> We provided the following properties file:
>
> <pre>
> 2015-12-10 15:14:37,843 185  [main] INFO  com.isllc.client.AbstractClient
> - Properties retrieved from file for Kafka client: kafka-producer.properties
> 2015-12-10 15:14:37,909 251  [main] INFO  com.isllc.client.AbstractClient
> -     acks=-1
> 2015-12-10 15:14:37,909 251  [main] INFO  com.isllc.client.AbstractClient
> -     ssl.protocol=TLSv1.2
> 2015-12-10 15:14:37,909 251  [main] INFO  com.isllc.client.AbstractClient
> -
>  key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
> 2015-12-10 15:14:37,910 252  [main] INFO  com.isllc.client.AbstractClient
> -     client.id=ExploreProducer
> 2015-12-10 15:14:37,910 252  [main] INFO  com.isllc.client.AbstractClient
> -     ssl.truststore.identification.algorithm=HTTPS
> 2015-12-10 15:14:37,910 252  [main] INFO  com.isllc.client.AbstractClient
> -
>  value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
> 2015-12-10 15:14:37,910 252  [main] INFO  com.isllc.client.AbstractClient
> -     ssl.truststore.password=changeit
> 2015-12-10 15:14:37,910 252  [main] INFO  com.isllc.client.AbstractClient
> -     ssl.truststore.type=JKS
> 2015-12-10 15:14:37,910 252  [main] INFO  com.isllc.client.AbstractClient
> -     ssl.enabled.protocols=TLSv1.2
> 2015-12-10 15:14:37,910 252  [main] INFO  com.isllc.client.AbstractClient
> -
>  
> ssl.truststore.location=/Library/Java/JavaVirtualMachines/jdk1.8.0_51.jdk/Contents/Home/jre/lib/security/cacerts
> 2015-12-10 15:14:37,910 252  [main] INFO  com.isllc.client.AbstractClient
> -     bootstrap.servers=
> kafka01-prod01.messagehub.services.us-south.bluemix.net:9094,
> kafka02-prod01.messagehub.services.us-south.bluemix.net:9094,
> kafka03-prod01.messagehub.services.us-south.bluemix.net:9094,
> kafka04-prod01.messagehub.services.us-south.bluemix.net:9094,
> kafka05-prod01.messagehub.services.us-south.bluemix.net:9094
> 2015-12-10 15:14:37,910 252  [main] INFO  com.isllc.client.AbstractClient
> -     security.protocol=SASL_SSL
>
> Plus we added linger.ms=10 in code.
> </pre>
>
> -----
>
> The Kafka Client shows the expanded/merged configuration list (and
> displaying the linger.ms setting):
>
> <pre>
> 2015-12-10 15:14:37,970 312  [main] INFO
> org.apache.kafka.clients.producer.ProducerConfig  - ProducerConfig values:
>         compression.type = none
>         metric.reporters = []
>         metadata.max.age.ms = 300000
>         metadata.fetch.timeout.ms = 60000
>         reconnect.backoff.ms = 50
>         sasl.kerberos.ticket.renew.window.factor = 0.8
>         bootstrap.servers = [
> kafka01-prod01.messagehub.services.us-south.bluemix.net:9094,
> kafka02-prod01.messagehub.services.us-south.bluemix.net:9094,
> kafka03-prod01.messagehub.services.us-south.bluemix.net:9094,
> kafka04-prod01.messagehub.services.us-south.bluemix.net:9094,
> kafka05-prod01.messagehub.services.us-south.bluemix.net:9094]
>         retry.backoff.ms = 100
>         sasl.kerberos.kinit.cmd = /usr/bin/kinit
>         buffer.memory = 33554432
>         timeout.ms = 30000
>         key.serializer = class
> org.apache.kafka.common.serialization.ByteArraySerializer
>         sasl.kerberos.service.name = null
>         sasl.kerberos.ticket.renew.jitter = 0.05
>         ssl.keystore.type = JKS
>         ssl.trustmanager.algorithm = PKIX
>         block.on.buffer.full = false
>         ssl.key.password = null
>         max.block.ms = 60000
>         sasl.kerberos.min.time.before.relogin = 60000
>         connections.max.idle.ms = 540000
>         ssl.truststore.password = [hidden]
>         max.in.flight.requests.per.connection = 5
>         metrics.num.samples = 2
>         client.id = ExploreProducer
>         ssl.endpoint.identification.algorithm = null
>         ssl.protocol = TLSv1.2
>         request.timeout.ms = 30000
>         ssl.provider = null
>         ssl.enabled.protocols = [TLSv1.2]
>         acks = -1
>         batch.size = 16384
>         ssl.keystore.location = null
>         receive.buffer.bytes = 32768
>         ssl.cipher.suites = null
>         ssl.truststore.type = JKS
>         security.protocol = SASL_SSL
>         retries = 0
>         max.request.size = 1048576
>         value.serializer = class
> org.apache.kafka.common.serialization.ByteArraySerializer
>         ssl.truststore.location =
> /Library/Java/JavaVirtualMachines/jdk1.8.0_51.jdk/Contents/Home/jre/lib/security/cacerts
>         ssl.keystore.password = null
>         ssl.keymanager.algorithm = SunX509
>         metrics.sample.window.ms = 30000
>         partitioner.class = class
> org.apache.kafka.clients.producer.internals.DefaultPartitioner
>         send.buffer.bytes = 131072
>         linger.ms = 10
> </pre>
> -----
>
> The Kafka metrics after sending 100 records:
>
> <pre>
> Duration for 100 sends 8787 ms. Sent 7687 bytes.
>     batch-size-avg = 109.87 [The average number of bytes sent per
> partition per-request.]
>     batch-size-max = 110.0 [The max number of bytes sent per partition
> per-request.]
>     buffer-available-bytes = 3.3554432E7 [The total amount of buffer
> memory that is not being used (either unallocated or in the free list).]
>     buffer-exhausted-rate = 0.0 [The average per-second number of record
> sends that are dropped due to buffer exhaustion]
>     buffer-total-bytes = 3.3554432E7 [The maximum amount of buffer memory
> the client can use (whether or not it is currently used).]
>     bufferpool-wait-ratio = 0.0 [The fraction of time an appender waits
> for space allocation.]
>     byte-rate = 291.8348916277093 []
>     compression-rate = 0.0 []
>     compression-rate-avg = 0.0 [The average compression rate of record
> batches.]
>     connection-close-rate = 0.0 [Connections closed per second in the
> window.]
>     connection-count = 2.0 [The current number of active connections.]
>     connection-creation-rate = 0.05180541884681138 [New connections
> established per second in the window.]
>     incoming-byte-rate = 10.342564641029007 []
>     io-ratio = 0.0038877559207471236 [The fraction of time the I/O thread
> spent doing I/O]
>     io-time-ns-avg = 353749.2840375587 [The average length of time for I/O
> per select call in nanoseconds.]
>     io-wait-ratio = 0.21531227995769162 [The fraction of time the I/O
> thread spent waiting.]
>     io-wait-time-ns-avg = 1.9591901192488264E7 [The average length of time
> the I/O thread spent waiting for a socket ready for reads or writes in
> nanoseconds.]
>     metadata-age = 8.096 [The age in seconds of the current producer
> metadata being used.]
>     network-io-rate = 5.2937784999213795 [The average number of network
> operations (reads or writes) on all connections per second.]
>     outgoing-byte-rate = 451.2298783403283 []
>     produce-throttle-time-avg = 0.0 [The average throttle time in ms]
>     produce-throttle-time-max = 0.0 [The maximum throttle time in ms]
>     record-error-rate = 0.0 [The average per-second number of record sends
> that resulted in errors]
>     record-queue-time-avg = 15.5 [The average time in ms record batches
> spent in the record accumulator.]
>     record-queue-time-max = 434.0 [The maximum time in ms record batches
> spent in the record accumulator.]
>     record-retry-rate = 0.0 []
>     record-send-rate = 2.65611304417116 [The average number of records
> sent per second.]
>     record-size-avg = 97.87 [The average record size]
>     record-size-max = 98.0 [The maximum record size]
>     records-per-request-avg = 1.0 [The average number of records per
> request.]
>     request-latency-avg = 0.0 [The average request latency in ms]
>     request-latency-max = 74.0 []
>     request-rate = 2.6468892499606897 [The average number of requests sent
> per second.]
>     request-size-avg = 42.0 [The average size of all requests in the
> window..]
>     request-size-max = 170.0 [The maximum size of any request sent in the
> window.]
>     requests-in-flight = 0.0 [The current number of in-flight requests
> awaiting a response.]
>     response-rate = 2.651196976060479 [The average number of responses
> received per second.]
>     select-rate = 10.989861465830819 [Number of times the I/O layer
> checked for new I/O to perform per second]
>     waiting-threads = 0.0 [The number of user threads blocked waiting for
> buffer memory to enqueue their records]
> </pre>
>
> Thanks
>
>
> Gary Gershon
> Principal, Intermedia Sciences LLC
> (908) 969-1119
> g...@intermediasciences.com
>
>
>
>
>


-- 
-- Guozhang

Reply via email to